Browse Source

make robust against error

* plug/unplug is handled well even if there are something half done.
* serialize plug/unplug.
* add cache management for lagopus resources
* output dsl when lagopus resources changed (not completed)
* many cleanups

Change-Id: I286e55ac93984ddf13b823474daef8eff5de67ba
changes/54/512454/9
Itsuro Oda 1 year ago
parent
commit
bf2a2582ad

+ 29
- 7
networking_lagopus/agent/interface.py View File

@@ -46,19 +46,35 @@ class LagopusInterfaceDriver(n_interface.LinuxInterfaceDriver):
46 46
                     'network_type': details['network_type'],
47 47
                     'segmentation_id': details['segmentation_id']}
48 48
 
49
+        raise RuntimeError("Failed to get segment for port %s" % port_id)
50
+
49 51
     def _disable_tcp_offload(self, namespace, device_name):
50 52
         ip_wrapper = ip_lib.IPWrapper(namespace)
51 53
         cmd = ['ethtool', '-K', device_name, 'tx', 'off', 'tso', 'off']
52 54
         ip_wrapper.netns.execute(cmd)
53 55
 
56
+    def plug(self, network_id, port_id, device_name, mac_address,
57
+             bridge=None, namespace=None, prefix=None, mtu=None):
58
+        # override this method because there are some tasks to be done
59
+        # regardless of whether the interface exists.
60
+        # note that plug_new must be implemented because it is
61
+        # an abstractmethod.
62
+        self.plug_new(network_id, port_id, device_name, mac_address,
63
+                      bridge, namespace, prefix, mtu)
64
+
54 65
     def plug_new(self, network_id, port_id, device_name, mac_address,
55 66
                  bridge=None, namespace=None, prefix=None, mtu=None):
56 67
         """Plugin the interface."""
57 68
         ip = ip_lib.IPWrapper()
58 69
         tap_name = device_name.replace(prefix or self.DEV_NAME_PREFIX,
59 70
                                        n_const.TAP_DEVICE_PREFIX)
60
-        root_veth, ns_veth = ip.add_veth(tap_name, device_name,
61
-                                         namespace2=namespace)
71
+        if ip_lib.device_exists(device_name, namespace=namespace):
72
+            LOG.info("Device %s already exists", device_name)
73
+            root_veth, ns_veth = n_interface._get_veth(tap_name, device_name,
74
+                                                       namespace)
75
+        else:
76
+            root_veth, ns_veth = ip.add_veth(tap_name, device_name,
77
+                                             namespace2=namespace)
62 78
         root_veth.disable_ipv6()
63 79
         ns_veth.link.set_address(mac_address)
64 80
 
@@ -72,12 +88,14 @@ class LagopusInterfaceDriver(n_interface.LinuxInterfaceDriver):
72 88
         ns_veth.link.set_up()
73 89
         self._disable_tcp_offload(namespace, device_name)
74 90
 
75
-        # do first and error check
76 91
         segment = self._get_network_segment(port_id)
77
-
78 92
         self.lagopus_api.plug_rawsock(self.context, tap_name, segment)
79
-        self.plugin_api.update_device_up(self.context, port_id,
80
-                                         self.agent_id, self.host)
93
+        try:
94
+            self.plugin_api.update_device_up(self.context, port_id,
95
+                                             self.agent_id, self.host)
96
+        except RuntimeError as e:
97
+            # the error is not critical. contiune.
98
+            LOG.warning("Failed to update_device_up: %s", e)
81 99
 
82 100
     def unplug(self, device_name, bridge=None, namespace=None, prefix=None):
83 101
         """Unplug the interface."""
@@ -85,8 +103,12 @@ class LagopusInterfaceDriver(n_interface.LinuxInterfaceDriver):
85 103
         tap_name = device_name.replace(prefix or self.DEV_NAME_PREFIX,
86 104
                                        n_const.TAP_DEVICE_PREFIX)
87 105
         try:
88
-            self.lagopus_api.unplug_rawsock(self.context, tap_name)
89 106
             device.link.delete()
107
+        except RuntimeError:
108
+            # note that the interface may not exist.
109
+            LOG.error("Failed deleting interface '%s'", device_name)
110
+        try:
111
+            self.lagopus_api.unplug_rawsock(self.context, tap_name)
90 112
             LOG.debug("Unplugged interface '%s'", device_name)
91 113
         except RuntimeError:
92 114
             LOG.error("Failed unplugging interface '%s'",

+ 360
- 110
networking_lagopus/agent/lagopus_lib.py View File

@@ -10,123 +10,373 @@
10 10
 #    License for the specific language governing permissions and limitations
11 11
 #    under the License.
12 12
 
13
-import socket
13
+import eventlet
14 14
 
15
-from neutron_lib import constants as n_const
16
-from oslo_log import helpers as log_helpers
17 15
 from oslo_log import log as logging
16
+from ryu.app.ofctl import api as ofctl_api
18 17
 
19 18
 from networking_lagopus.agent import lagosh
20 19
 
21 20
 LOG = logging.getLogger(__name__)
22
-SOCKET_ISSUE = "Socket connection refused.  Lagopus is not running?\n"
23 21
 
22
+OFPP_MAX = 0xffffff00
24 23
 
25
-class LagopusCommand(object):
24
+DEVICE_TYPE_PHYS = "ethernet-dpdk-phy"
25
+DEVICE_TYPE_RAWSOCK = "ethernet-rawsock"
26 26
 
27
-    def _lagosh(self, cmd=None):
28
-        if not cmd:
27
+INTERFACE_TYPE_VHOST = "vhost"
28
+INTERFACE_TYPE_PIPE = "pipe"
29
+INTERFACE_TYPE_PHYS = "phys"
30
+INTERFACE_TYPE_RAWSOCK = "rawsock"
31
+
32
+BRIDGE_TYPE_PHYS = "phys"
33
+BRIDGE_TYPE_VLAN = "vlan"
34
+
35
+_config_change_callback = None
36
+
37
+
38
+def register_config_change_callback(callback):
39
+    global _config_change_callback
40
+    _config_change_callback = callback
41
+
42
+
43
+def config_changed():
44
+    global _config_change_callback
45
+    if _config_change_callback:
46
+        _config_change_callback()
47
+
48
+
49
+class LagopusResource(object):
50
+
51
+    resource = None
52
+
53
+    def __init__(self, name):
54
+        self.name = name
55
+
56
+    def create_param_str(self):
57
+        return ""
58
+
59
+    def create_str(self):
60
+        cmd = "%s %s create" % (self.resource, self.name)
61
+        param = self.create_param_str()
62
+        if param:
63
+            cmd += " %s\n" % param
64
+        else:
65
+            cmd += "\n"
66
+        return cmd
67
+
68
+    def _exec(self, cmd):
69
+        LOG.debug("lagopus cmd executed: %s", cmd.rstrip())
70
+        return lagosh.ds_client().call(cmd)
71
+
72
+    def create(self):
73
+        self._exec(self.create_str())
74
+
75
+    def destroy(self):
76
+        cmd = "%s %s destroy\n" % (self.resource, self.name)
77
+        self._exec(cmd)
78
+
79
+    @classmethod
80
+    def show(cls):
81
+        cmd = "%s\n" % cls.resource
82
+        return lagosh.ds_client().call(cmd)
83
+
84
+    @classmethod
85
+    def mk_name(cls):
86
+        return "unknown"
87
+
88
+
89
+class LagopusChannel(LagopusResource):
90
+
91
+    resource = "channel"
92
+
93
+    def __init__(self, name):
94
+        super(LagopusChannel, self).__init__(name)
95
+
96
+    def create_param_str(self):
97
+        return "-dst-addr 127.0.0.1 -protocol tcp"
98
+
99
+    @classmethod
100
+    def mk_name(cls, bridge):
101
+        # channel name convention: "ch-" + bridge name
102
+        return "ch-%s" % bridge
103
+
104
+
105
+class LagopusController(LagopusResource):
106
+
107
+    resource = "controller"
108
+
109
+    def __init__(self, name, channel):
110
+        super(LagopusController, self).__init__(name)
111
+        self.channel = channel
112
+
113
+    def create_param_str(self):
114
+        return "-channel %s -role equal -connection-type main" % self.channel
115
+
116
+    @classmethod
117
+    def mk_name(cls, bridge):
118
+        # controller name convention: "con-" + bridge name
119
+        return "con-%s" % bridge
120
+
121
+
122
+class LagopusInterface(LagopusResource):
123
+
124
+    resource = "interface"
125
+
126
+    def __init__(self, name, dev_type, device, port_number=0):
127
+        super(LagopusInterface, self).__init__(name)
128
+        self.dev_type = dev_type
129
+        self.device = device
130
+        self.port_number = port_number
131
+        self.type = self._get_interface_type()
132
+        self.id = self._get_id_for_type()
133
+        self.is_used = False
134
+
135
+    def used(self):
136
+        self.is_used = True
137
+
138
+    def unused(self):
139
+        self.is_used = False
140
+
141
+    def _get_interface_type(self):
142
+        if self.dev_type == DEVICE_TYPE_PHYS:
143
+            if self.device.startswith("eth_vhost"):
144
+                return INTERFACE_TYPE_VHOST
145
+            elif self.device.startswith("eth_pipe"):
146
+                return INTERFACE_TYPE_PIPE
147
+            else:  # device == ""
148
+                return INTERFACE_TYPE_PHYS
149
+        else:  # dev_type == DEVICE_TYPE_RAWSOCK
150
+            return INTERFACE_TYPE_RAWSOCK
151
+
152
+    def _get_id_for_type(self):
153
+        if self.type == INTERFACE_TYPE_VHOST:
154
+            return int(self.device.split(',')[0][len("eth_vhost"):])
155
+        elif self.type == INTERFACE_TYPE_PIPE:
156
+            return int(self.device.split(',')[0][len("eth_pipe"):])
157
+
158
+    def create_param_str(self):
159
+        type_str = "-type %s " % self.dev_type
160
+        if self.type == INTERFACE_TYPE_PHYS:
161
+            param_str = "-port-number %d" % self.port_number
162
+        else:
163
+            param_str = "-device %s" % self.device
164
+        return type_str + param_str
165
+
166
+    @classmethod
167
+    def mk_name(cls, interface_type, name_key):
168
+        # interface name convention:
169
+        #   vhost:         "vhost_" + name_key(==vhost_id)
170
+        #   pipe:          "pipe-" + name_key(==pipe_id)
171
+        #   else(rawsock): "i" + name_key(==device)
172
+        prefix = "i"
173
+        if interface_type == INTERFACE_TYPE_VHOST:
174
+            prefix = "vhost_"
175
+        elif interface_type == INTERFACE_TYPE_PIPE:
176
+            prefix = "pipe-"
177
+
178
+        return prefix + str(name_key)
179
+
180
+
181
+class LagopusPort(LagopusResource):
182
+
183
+    resource = "port"
184
+
185
+    def __init__(self, name, interface):
186
+        super(LagopusPort, self).__init__(name)
187
+        self.interface = interface
188
+        # Used by bridge when port is added to bridge
189
+        self.bridge = None
190
+        self.ofport = None
191
+
192
+    def create_param_str(self):
193
+        return "-interface %s" % self.interface.name
194
+
195
+    def add_bridge_str(self):
196
+        if self.bridge is not None:
197
+            return ("bridge %s config -port %s %s\n" %
198
+                    (self.bridge.name, self.name, self.ofport))
199
+
200
+    def create(self):
201
+        super(LagopusPort, self).create()
202
+        self.interface.used()
203
+
204
+    def destroy(self):
205
+        super(LagopusPort, self).destroy()
206
+        self.interface.unused()
207
+
208
+    @classmethod
209
+    def mk_name(cls, interface_type, name_key):
210
+        # port name convention:
211
+        #   vhost:         name_key(==port_id)
212
+        #   pipe:          "p-" + name_key(==pipe interface name)
213
+        #   else(rawsock): "p" + name_key(==device)
214
+        if interface_type == INTERFACE_TYPE_VHOST:
215
+            return name_key
216
+        elif interface_type == INTERFACE_TYPE_PIPE:
217
+            return "p-" + name_key
218
+        else:
219
+            return "p" + name_key
220
+
221
+
222
+class LagopusBridge(LagopusResource):
223
+
224
+    resource = "bridge"
225
+
226
+    def __init__(self, name, ryu_app, controller, dpid,
227
+                 b_type=BRIDGE_TYPE_VLAN, is_enabled=False):
228
+        super(LagopusBridge, self).__init__(name)
229
+        self.ryu_app = ryu_app
230
+        self.controller = controller
231
+        self.dpid = dpid
232
+        self.type = b_type
233
+        self.is_enabled = is_enabled
234
+
235
+        self.max_ofport = 0
236
+        self.used_ofport = []
237
+        self.pipe_id = None
238
+
239
+        if is_enabled:
240
+            self.initialize()
241
+
242
+    def create(self):
243
+        super(LagopusBridge, self).create()
244
+        self.enable()
245
+
246
+    def initialize(self):
247
+        self.installed_vlan = []
248
+        self.datapath = self._get_datapath()
249
+        self.install_normal()
250
+
251
+        self.dump_flows()  # just for debug
252
+
253
+    def _get_datapath(self):
254
+        # TODO(hichihara): set timeout
255
+        # NOTE: basically it is OK because lagopus is running
256
+        # and dpid exists at this point. so the call shoud be
257
+        # success.
258
+        while True:
259
+            dp = ofctl_api.get_datapath(self.ryu_app, self.dpid)
260
+            if dp is not None:
261
+                return dp
262
+            # lagopus switch dose not establish connection yet.
263
+            # wait a while
264
+            eventlet.sleep(1)
265
+
266
+    def install_normal(self):
267
+        ofp = self.datapath.ofproto
268
+        ofpp = self.datapath.ofproto_parser
269
+
270
+        actions = [ofpp.OFPActionOutput(ofp.OFPP_NORMAL, 0)]
271
+        instructions = [ofpp.OFPInstructionActions(
272
+                        ofp.OFPIT_APPLY_ACTIONS, actions)]
273
+        msg = ofpp.OFPFlowMod(self.datapath,
274
+                              table_id=0,
275
+                              priority=0,
276
+                              instructions=instructions)
277
+        # TODO(hichihara): error handling
278
+        ofctl_api.send_msg(self.ryu_app, msg)
279
+
280
+    def install_vlan(self, vlan_id, port):
281
+        if vlan_id in self.installed_vlan:
29 282
             return
30
-        lagosh_client = lagosh.ds_client()
31
-        try:
32
-            return lagosh_client.call(cmd)
33
-        except socket.error:
34
-            LOG.debug("_lagosh socket error")
35
-        except lagosh.DSLError as e:
36
-            LOG.debug("_lagosh DSLError cmd: %s, error: %s", cmd, e)
37
-
38
-    def show_interfaces(self):
39
-        cmd = "interface\n"
40
-        return self._lagosh(cmd)
41
-
42
-    def show_ports(self):
43
-        cmd = "port\n"
44
-        return self._lagosh(cmd)
45
-
46
-    def show_bridges(self):
47
-        cmd = "bridge\n"
48
-        return self._lagosh(cmd)
49
-
50
-    def show_channels(self):
51
-        cmd = "channel\n"
52
-        return self._lagosh(cmd)
53
-
54
-    def show_controllers(self):
55
-        cmd = "controller\n"
56
-        return self._lagosh(cmd)
57
-
58
-    @log_helpers.log_method_call
59
-    def create_channel(self, name):
60
-        cmd = "channel %s create -dst-addr 127.0.0.1 -protocol tcp\n" % name
61
-        self._lagosh(cmd)
62
-
63
-    @log_helpers.log_method_call
64
-    def create_controller(self, name, channel):
65
-        cmd = ("controller %s create -channel %s -role equal "
66
-               "-connection-type main\n") % (name, channel)
67
-        self._lagosh(cmd)
68
-
69
-    @log_helpers.log_method_call
70
-    def create_bridge(self, name, controller, dpid):
71
-        cmd = ("bridge %s create -controller %s -dpid %d "
72
-               "-l2-bridge True -mactable-ageing-time 300 "
73
-               "-mactable-max-entries 8192\n") % (name, controller, dpid)
74
-        self._lagosh(cmd)
75
-        cmd = "bridge %s enable\n" % name
76
-        self._lagosh(cmd)
77
-
78
-    # TODO(hichihara): unify create_*_interface
79
-    @log_helpers.log_method_call
80
-    def create_vhost_interface(self, name, device):
81
-        cmd = ("interface %s create -type ethernet-dpdk-phy "
82
-               "-device %s\n") % (name, device)
83
-        self._lagosh(cmd)
84
-
85
-    def create_pipe_interface(self, name, device):
86
-        self.create_vhost_interface(name, device)
87
-
88
-    @log_helpers.log_method_call
89
-    def create_rawsock_interface(self, name, device):
90
-        cmd = ("interface %s create -type ethernet-rawsock "
91
-               "-device %s\n") % (name, device)
92
-        self._lagosh(cmd)
93
-
94
-    @log_helpers.log_method_call
95
-    def create_port(self, port, interface):
96
-        cmd = "port %s create -interface %s\n" % (port, interface)
97
-        self._lagosh(cmd)
98
-
99
-    @log_helpers.log_method_call
100
-    def destroy_port(self, port):
101
-        cmd = "port %s destroy\n" % port
102
-        self._lagosh(cmd)
103
-
104
-    @log_helpers.log_method_call
105
-    def destroy_interface(self, interface):
106
-        cmd = "interface %s destroy\n" % interface
107
-        self._lagosh(cmd)
108
-
109
-    @log_helpers.log_method_call
110
-    def bridge_add_port(self, bridge_name, port_name, ofport):
111
-        cmd = ("bridge %s config -port %s %d\n" %
112
-               (bridge_name, port_name, ofport))
113
-        self._lagosh(cmd)
114
-
115
-    @log_helpers.log_method_call
116
-    def bridge_del_port(self, bridge_name, port_name):
117
-        cmd = "bridge %s config -port -%s\n" % (bridge_name, port_name)
118
-        self._lagosh(cmd)
119
-
120
-    def find_bridge_port(self, port_id, bridge_name=None):
121
-        if port_id.startswith(n_const.TAP_DEVICE_PREFIX):
122
-            port_id = port_id[len(n_const.TAP_DEVICE_PREFIX):]
123
-        bridges = self.show_bridges()
124
-        for bridge in bridges:
125
-            if bridge_name and bridge["name"] != bridge_name:
126
-                continue
127
-            ports = bridge["ports"]
128
-            for port in ports:
129
-                port_name = port[1:]
130
-                if port_name.startswith(port_id):
131
-                    return bridge["name"], port_name
132
-        return None, None
283
+        ofport = port.ofport
284
+        ofp = self.datapath.ofproto
285
+        ofpp = self.datapath.ofproto_parser
286
+
287
+        # pipe port -> phys port: push vlan, output:1
288
+        match = ofpp.OFPMatch(in_port=ofport)
289
+        vlan_vid = vlan_id | ofp.OFPVID_PRESENT
290
+        actions = [ofpp.OFPActionPushVlan(),
291
+                   ofpp.OFPActionSetField(vlan_vid=vlan_vid),
292
+                   ofpp.OFPActionOutput(1, 0)]
293
+        instructions = [ofpp.OFPInstructionActions(
294
+                        ofp.OFPIT_APPLY_ACTIONS, actions)]
295
+        msg = ofpp.OFPFlowMod(self.datapath,
296
+                              table_id=0,
297
+                              priority=2,
298
+                              match=match,
299
+                              instructions=instructions)
300
+        # TODO(hichihara): error handling
301
+        ofctl_api.send_msg(self.ryu_app, msg)
302
+
303
+        # phys port -> pipe port: pop vlan, output:<ofport>
304
+        vlan_vid = vlan_id | ofp.OFPVID_PRESENT
305
+        match = ofpp.OFPMatch(in_port=1, vlan_vid=vlan_vid)
306
+        actions = [ofpp.OFPActionPopVlan(),
307
+                   ofpp.OFPActionOutput(ofport, 0)]
308
+        instructions = [ofpp.OFPInstructionActions(
309
+                        ofp.OFPIT_APPLY_ACTIONS, actions)]
310
+        msg = ofpp.OFPFlowMod(self.datapath,
311
+                              table_id=0,
312
+                              priority=2,
313
+                              match=match,
314
+                              instructions=instructions)
315
+        # TODO(hichihara): error handling
316
+        ofctl_api.send_msg(self.ryu_app, msg)
317
+
318
+        self.installed_vlan.append(vlan_id)
319
+
320
+    def dump_flows(self):
321
+        ofpp = self.datapath.ofproto_parser
322
+        msg = ofpp.OFPFlowStatsRequest(self.datapath)
323
+        reply_cls = ofpp.OFPFlowStatsReply
324
+        # TODO(hichihara): error handling
325
+        result = ofctl_api.send_msg(self.ryu_app, msg, reply_cls=reply_cls,
326
+                                    reply_multi=True)
327
+        LOG.debug("%s flows: %s", self.name, result)
328
+
329
+    def get_ofport(self):
330
+        if self.max_ofport < OFPP_MAX:
331
+            return self.max_ofport + 1
332
+        else:
333
+            for ofport in xrange(1, OFPP_MAX + 1):
334
+                if ofport not in self.used_ofport:
335
+                    return ofport
336
+
337
+    def add_port(self, port, ofport):
338
+        self.used_ofport.append(ofport)
339
+        self.max_ofport = max(self.max_ofport, ofport)
340
+        port.ofport = ofport
341
+        port.bridge = self
342
+        if (self.type == BRIDGE_TYPE_VLAN and
343
+                port.interface.type == INTERFACE_TYPE_PIPE):
344
+            self.pipe_id = port.interface.id
345
+
346
+    def del_port(self, port):
347
+        self.used_ofport.remove(port.ofport)
348
+        port.bridge = None
349
+        port.ofport = None
350
+
351
+    def create_param_str(self):
352
+        param = ("-controller %s -dpid %d "
353
+                 "-l2-bridge True -mactable-ageing-time 300 "
354
+                 "-mactable-max-entries 8192") % (self.controller,
355
+                                                  self.dpid)
356
+        return param
357
+
358
+    def enable_str(self):
359
+        return "bridge %s enable\n" % self.name
360
+
361
+    def enable(self):
362
+        self._exec(self.enable_str())
363
+        self.is_enabled = True
364
+        self.initialize()
365
+
366
+    def bridge_add_port(self, port, ofport):
367
+        cmd = ("bridge %s config -port %s %s\n" %
368
+               (self.name, port.name, ofport))
369
+        self._exec(cmd)
370
+        self.add_port(port, ofport)
371
+        config_changed()
372
+
373
+    def bridge_del_port(self, port):
374
+        cmd = "bridge %s config -port -%s\n" % (self.name, port.name)
375
+        self._exec(cmd)
376
+        self.del_port(port)
377
+
378
+    @classmethod
379
+    def mk_name(cls, phys_net, vlan_id):
380
+        # bridge name convention: "phys_net"_"vlan_id"
381
+        # this is used for vlan bridge only.
382
+        return "%s_%d" % (phys_net, vlan_id)

+ 1
- 4
networking_lagopus/agent/lagosh.py View File

@@ -232,10 +232,7 @@ class ds_client(object):
232 232
         self.request('unlock\n')
233 233
 
234 234
     def call(self, arg, response=True):
235
-        try:
236
-            self.open()
237
-        except Exception:
238
-            raise
235
+        self.open()
239 236
         self.write(arg)
240 237
         if response:
241 238
             jdata = self.read()

+ 2
- 1
networking_lagopus/agent/rpc.py View File

@@ -54,5 +54,6 @@ class LagopusAgentApi(object):
54 54
 
55 55
     def unplug_vhost(self, context, port_id, host=None):
56 56
         cctxt = self._get_context(host)
57
-        return cctxt.call(context, 'unplug_vhost',
57
+        # asynchronous
58
+        return cctxt.cast(context, 'unplug_vhost',
58 59
                           port_id=port_id)

+ 279
- 328
networking_lagopus/ml2/agent/lagopus_agent.py View File

@@ -12,6 +12,7 @@
12 12
 
13 13
 import eventlet
14 14
 import os
15
+import socket
15 16
 import sys
16 17
 
17 18
 from neutron_lib import constants
@@ -23,8 +24,8 @@ from oslo_log import log as logging
23 24
 from oslo_service import loopingcall
24 25
 from oslo_service import service
25 26
 from osprofiler import profiler
26
-from ryu.app.ofctl import api as ofctl_api
27 27
 
28
+from neutron.agent.linux import ip_lib
28 29
 from neutron.agent import rpc as agent_rpc
29 30
 from neutron.api.rpc.callbacks import resources
30 31
 from neutron.common import config as common_config
@@ -32,7 +33,7 @@ from neutron.common import rpc as n_rpc
32 33
 from neutron.common import topics
33 34
 from neutron.plugins.ml2.drivers.agent import config as agent_config  # noqa
34 35
 
35
-from networking_lagopus.agent import lagopus_lib
36
+from networking_lagopus.agent import lagopus_lib as lg_lib
36 37
 from networking_lagopus.common import config  # noqa
37 38
 
38 39
 
@@ -41,378 +42,328 @@ LOG = logging.getLogger(__name__)
41 42
 LAGOPUS_AGENT_BINARY = 'neutron-lagopus-agent'
42 43
 AGENT_TYPE_LAGOPUS = 'Lagopus agent'
43 44
 MAX_WAIT_LAGOPUS_RETRY = 5
44
-OFPP_MAX = 0xffffff00
45 45
 
46 46
 
47
-class LagopusBridge(object):
47
+class LagopusCache(dict):
48 48
 
49
-    def __init__(self, ryu_app, name, dpid, port_data):
50
-        LOG.debug("LagopusBridge: %s %s", name, dpid)
51
-        self.ryu_app = ryu_app
52
-        self.name = name
53
-        self.lagopus_client = lagopus_lib.LagopusCommand()
54
-
55
-        self.used_ofport = []
56
-        self.max_ofport = 0
57
-        self.port_mappings = {}
58
-
59
-        if port_data:
60
-            for port, ofport in port_data.items():
61
-                # remove ':'
62
-                port_name = port[1:]
63
-                self.port_mappings[port_name] = ofport
64
-                self.used_ofport.append(ofport)
65
-            if self.used_ofport:
66
-                self.max_ofport = max(self.used_ofport)
67
-
68
-        LOG.debug("used_ofport: %s, max_ofport: %d",
69
-                  self.used_ofport, self.max_ofport)
70
-
71
-        self.dpid = dpid
72
-        self.datapath = self._get_datapath()
73
-        self.install_normal()
74
-        # just for debug
75
-        self.dump_flows()
76
-        return
77
-
78
-    def _get_datapath(self):
79
-        # TODO(hichihara): set timeout
80
-        # NOTE: basically it is OK because lagopus is running
81
-        # and dpid exists at this point. so the call shoud be
82
-        # success.
83
-        while True:
84
-            dp = ofctl_api.get_datapath(self.ryu_app, self.dpid)
85
-            if dp is not None:
86
-                return dp
87
-            # lagopus switch dose not establish connection yet.
88
-            # wait a while
89
-            eventlet.sleep(1)
90
-
91
-    def install_normal(self):
92
-        ofp = self.datapath.ofproto
93
-        ofpp = self.datapath.ofproto_parser
94
-
95
-        actions = [ofpp.OFPActionOutput(ofp.OFPP_NORMAL, 0)]
96
-        instructions = [ofpp.OFPInstructionActions(
97
-                        ofp.OFPIT_APPLY_ACTIONS, actions)]
98
-        msg = ofpp.OFPFlowMod(self.datapath,
99
-                              table_id=0,
100
-                              priority=0,
101
-                              instructions=instructions)
102
-        # TODO(hichihara): error handling
103
-        ofctl_api.send_msg(self.ryu_app, msg)
104
-
105
-    def install_vlan(self, vlan_id, port_name):
106
-        ofport = self.port_mappings[port_name]
107
-        ofp = self.datapath.ofproto
108
-        ofpp = self.datapath.ofproto_parser
109
-
110
-        # pipe port -> phys port: push vlan, output:1
111
-        match = ofpp.OFPMatch(in_port=ofport)
112
-        vlan_vid = vlan_id | ofp.OFPVID_PRESENT
113
-        actions = [ofpp.OFPActionPushVlan(),
114
-                   ofpp.OFPActionSetField(vlan_vid=vlan_vid),
115
-                   ofpp.OFPActionOutput(1, 0)]
116
-        instructions = [ofpp.OFPInstructionActions(
117
-                        ofp.OFPIT_APPLY_ACTIONS, actions)]
118
-        msg = ofpp.OFPFlowMod(self.datapath,
119
-                              table_id=0,
120
-                              priority=2,
121
-                              match=match,
122
-                              instructions=instructions)
123
-        # TODO(hichihara): error handling
124
-        ofctl_api.send_msg(self.ryu_app, msg)
125
-
126
-        # phys port -> pipe port: pop vlan, output:<ofport>
127
-        vlan_vid = vlan_id | ofp.OFPVID_PRESENT
128
-        match = ofpp.OFPMatch(in_port=1, vlan_vid=vlan_vid)
129
-        actions = [ofpp.OFPActionPopVlan(),
130
-                   ofpp.OFPActionOutput(ofport, 0)]
131
-        instructions = [ofpp.OFPInstructionActions(
132
-                        ofp.OFPIT_APPLY_ACTIONS, actions)]
133
-        msg = ofpp.OFPFlowMod(self.datapath,
134
-                              table_id=0,
135
-                              priority=2,
136
-                              match=match,
137
-                              instructions=instructions)
138
-        # TODO(hichihara): error handling
139
-        ofctl_api.send_msg(self.ryu_app, msg)
140
-
141
-    def dump_flows(self):
142
-        ofpp = self.datapath.ofproto_parser
143
-        msg = ofpp.OFPFlowStatsRequest(self.datapath)
144
-        reply_cls = ofpp.OFPFlowStatsReply
145
-        # TODO(hichihara): error handling
146
-        result = ofctl_api.send_msg(self.ryu_app, msg, reply_cls=reply_cls,
147
-                                    reply_multi=True)
148
-        LOG.debug("%s flows: %s", self.name, result)
149
-
150
-    def get_ofport(self):
151
-        if self.max_ofport < OFPP_MAX:
152
-            self.max_ofport += 1
153
-            self.used_ofport.append(self.max_ofport)
154
-            return self.max_ofport
155
-        for num in range(1, OFPP_MAX + 1):
156
-            if num not in self.used_ofport:
157
-                self.used_ofport.append(num)
158
-                return num
159
-
160
-    def free_ofport(self, ofport):
161
-        if ofport in self.used_ofport:
162
-            self.used_ofport.remove(ofport)
163
-
164
-    def add_port(self, port_name):
165
-        b, p = self.lagopus_client.find_bridge_port(port_name, self.name)
166
-        if b is not None:
167
-            LOG.debug("port %s is already pluged.", port_name)
168
-            return
169
-
170
-        ofport = self.get_ofport()
171
-        self.port_mappings[port_name] = ofport
172
-        self.lagopus_client.bridge_add_port(self.name, port_name, ofport)
49
+    def __init__(self, resource_cls):
50
+        self.resource_cls = resource_cls
51
+
52
+    def add(self, name, *args):
53
+        # this method is called when initialization to register already
54
+        # exist resources.
55
+        obj = self.resource_cls(name, *args)
56
+        self[name] = obj
57
+        return obj
58
+
59
+    def create(self, name, *args):
60
+        if name not in self:
61
+            obj = self.resource_cls(name, *args)
62
+            obj.create()
63
+            self[name] = obj
64
+            lg_lib.config_changed()
65
+        return self[name]
66
+
67
+    def destroy(self, name):
68
+        if name in self:
69
+            obj = self[name]
70
+            obj.destroy()
71
+            del self[name]
72
+            lg_lib.config_changed()
73
+
74
+    def show(self):
75
+        return self.resource_cls.show()
76
+
77
+    def mk_name(self, *args):
78
+        return self.resource_cls.mk_name(*args)
173 79
 
174 80
 
175 81
 @profiler.trace_cls("rpc")
176 82
 class LagopusManager(object):
177 83
 
178 84
     def __init__(self, ryu_app, bridge_mappings):
179
-        self.lagopus_client = lagopus_lib.LagopusCommand()
180 85
         self.bridge_mappings = bridge_mappings
181 86
         self.ryu_app = ryu_app
182
-
183
-        raw_bridges = self._get_init_bridges()
184
-
185
-        self.bridges = {}
186
-        name_to_dpid = {}
187
-        bridge_names = bridge_mappings.values()
188
-        for raw_bridge in raw_bridges:
189
-            name = raw_bridge["name"]
190
-            dpid = raw_bridge["dpid"]
191
-            ports = raw_bridge["ports"]
192
-            self.bridges[dpid] = LagopusBridge(ryu_app, name, dpid, ports)
193
-            if name in bridge_names:
194
-                name_to_dpid[name] = dpid
195
-
196
-        self.phys_to_dpid = {}
87
+        self.serializer = eventlet.semaphore.Semaphore()
88
+
89
+        self._wait_lagopus_initialized()
90
+
91
+        lg_lib.register_config_change_callback(self._rebuild_dsl)
92
+
93
+        # initialize device caches
94
+        # channel
95
+        self.channels = LagopusCache(lg_lib.LagopusChannel)
96
+        raw_data = self.channels.show()
97
+        LOG.debug("channels: %s", raw_data)
98
+        for item in raw_data:
99
+            self.channels.add(item["name"])
100
+
101
+        # controller
102
+        self.controllers = LagopusCache(lg_lib.LagopusController)
103
+        raw_data = self.controllers.show()
104
+        LOG.debug("controllers: %s", raw_data)
105
+        for item in raw_data:
106
+            self.controllers.add(item["name"], item["channel"])
107
+
108
+        # interface
109
+        self.interfaces = LagopusCache(lg_lib.LagopusInterface)
110
+        raw_data = self.interfaces.show()
111
+        LOG.debug("interfaces: %s", raw_data)
112
+        for item in raw_data:
113
+            interface = self.interfaces.add(item["name"], item["type"],
114
+                                            item["device"],
115
+                                            item.get("port-number"))
116
+
117
+        # port
118
+        self.ports = LagopusCache(lg_lib.LagopusPort)
119
+        raw_data = self.ports.show()
120
+        LOG.debug("ports: %s", raw_data)
121
+        for item in raw_data:
122
+            interface = self.interfaces[item["interface"]]
123
+            self.ports.add(item["name"], interface)
124
+            interface.used()
125
+
126
+        # bridge
127
+        self.bridges = LagopusCache(lg_lib.LagopusBridge)
128
+        raw_data = self.bridges.show()
129
+        LOG.debug("bridges: %s", raw_data)
130
+        phys_bridge_names = bridge_mappings.values()
131
+        for item in raw_data:
132
+            b_name = item["name"]
133
+            controller = item["controllers"][0][1:]  # remove ":"
134
+            b_type = (lg_lib.BRIDGE_TYPE_PHYS if b_name in phys_bridge_names
135
+                      else lg_lib.BRIDGE_TYPE_VLAN)
136
+            bridge = self.bridges.add(b_name, ryu_app, controller,
137
+                                      item["dpid"], b_type, item["is-enabled"])
138
+            for p_name, ofport in item["ports"].items():
139
+                port = self.ports[p_name[1:]]  # remove ":"
140
+                bridge.add_port(port, ofport)
141
+
142
+        # check physical bridge existence
143
+        self.phys_to_bridge = {}
197 144
         for phys_net, name in bridge_mappings.items():
198
-            if name not in name_to_dpid:
145
+            if name not in self.bridges:
199 146
                 LOG.error("Bridge %s not found.", name)
200 147
                 sys.exit(1)
201
-            self.phys_to_dpid[phys_net] = name_to_dpid[name]
202
-        LOG.debug("phys_to_dpid: %s", self.phys_to_dpid)
203
-
204
-        interfaces = self.lagopus_client.show_interfaces()
205
-        ports = self.lagopus_client.show_ports()
206
-        LOG.debug("interfaces: %s", interfaces)
207
-        LOG.debug("ports: %s", ports)
208
-
209
-        # init vhost
210
-        vhost_interfaces = [inter for inter in interfaces
211
-                            if inter["device"].startswith("eth_vhost")]
212
-        self.num_vhost = len(vhost_interfaces)
213
-        used_interfaces = [p["interface"] for p in ports]
214
-        self.used_vhost_id = []
215
-        for inter in vhost_interfaces:
216
-            if inter["name"] in used_interfaces:
217
-                vhost_dev = inter['device'].split(',')[0]
218
-                vhost_id = int(vhost_dev[len("eth_vhost"):])
219
-                self.used_vhost_id.append(vhost_id)
220
-        LOG.debug("num_vhost: %d, used_vhost_id: %s", self.num_vhost,
221
-                  self.used_vhost_id)
222
-
223
-        # init pipe
224
-        pipe_interfaces = [inter for inter in interfaces
225
-                           if inter["device"].startswith("eth_pipe")]
226
-        self.num_pipe = len(pipe_interfaces)
227
-        # TODO(hichihara) pipe interface does not remove now.
228
-
229
-    def _get_init_bridges(self):
148
+            self.phys_to_bridge[phys_net] = self.bridges[name]
149
+
150
+        # vost_id and pipe_id management
151
+        self.free_vhost_interfaces = []
152
+        self.num_vhost = 0
153
+        max_pipe_num = 0
154
+        for interface in self.interfaces.values():
155
+            if interface.type == lg_lib.INTERFACE_TYPE_VHOST:
156
+                self.num_vhost += 1
157
+                if not interface.is_used:
158
+                    sock_path = self._sock_path(interface.id)
159
+                    os.system("sudo chmod 777 %s" % sock_path)
160
+                    self.free_vhost_interfaces.append(interface)
161
+            elif interface.type == lg_lib.INTERFACE_TYPE_PIPE:
162
+                # only interested in even number
163
+                if interface.id % 2 == 0:
164
+                    max_pipe_num = max(max_pipe_num, interface.id)
165
+        self.next_pipe_id = max_pipe_num + 2
166
+
167
+        # make initial dsl
168
+        self._rebuild_dsl()
169
+
170
+    def _wait_lagopus_initialized(self):
230 171
         for retry in range(MAX_WAIT_LAGOPUS_RETRY):
231
-            raw_bridges = self.lagopus_client.show_bridges()
232
-            if raw_bridges:
233
-                LOG.debug("bridges: %s", raw_bridges)
234
-                return raw_bridges
235
-            LOG.debug("Lagopus may not be initialized. waiting")
172
+            try:
173
+                lg_lib.LagopusChannel.show()
174
+                return
175
+            except socket.error:
176
+                LOG.debug("Lagopus may not be initialized. waiting")
236 177
             eventlet.sleep(10)
237 178
         LOG.error("Lagopus isn't running")
238 179
         sys.exit(1)
239 180
 
240
-    def get_vhost_interface(self):
241
-        if self.num_vhost == len(self.used_vhost_id):
242
-            # create new vhost interface
243
-            vhost_id = self.num_vhost
244
-            sock_path = "/tmp/sock%d" % vhost_id
245
-            device = "eth_vhost%d,iface=%s" % (vhost_id, sock_path)
246
-            name = "vhost_%d" % vhost_id
247
-            self.lagopus_client.create_vhost_interface(name, device)
248
-            self.num_vhost += 1
249
-            LOG.debug("vhost %d added.", vhost_id)
250
-            os.system("sudo chmod 777 %s" % sock_path)
181
+    def _rebuild_dsl(self):
182
+        # TODO(oda): just for backup now. it is able to restart lagopus
183
+        # uging this dsl manually. replace actual dsl in the future.
184
+        path = "/tmp/lagopus-backup.dsl"  # path is temporary
185
+        with open(path, "w") as f:
186
+            for obj in self.channels.values():
187
+                f.write(obj.create_str())
188
+            for obj in self.controllers.values():
189
+                f.write(obj.create_str())
190
+            for obj in self.bridges.values():
191
+                f.write(obj.create_str())
192
+                f.write(obj.enable_str())
193
+            # make interfaces lexical order. it is intended to make
194
+            # 'pipe-0' in advance of 'pipe-1' for example.
195
+            for name in sorted(self.interfaces.keys()):
196
+                obj = self.interfaces[name]
197
+                f.write(obj.create_str())
198
+            for obj in self.ports.values():
199
+                f.write(obj.create_str())
200
+                if obj.bridge:
201
+                    f.write(obj.add_bridge_str())
202
+
203
+    def _sock_path(self, vhost_id):
204
+        return "/tmp/sock%d" % vhost_id
205
+
206
+    def create_pipe_interfaces(self, pipe_id):
207
+        i_name1 = self.interfaces.mk_name(lg_lib.INTERFACE_TYPE_PIPE, pipe_id)
208
+        i_name2 = self.interfaces.mk_name(lg_lib.INTERFACE_TYPE_PIPE,
209
+                                          pipe_id + 1)
210
+        device1 = "eth_pipe%d" % pipe_id
211
+        device2 = "eth_pipe%d,attach=%s" % (pipe_id + 1, device1)
212
+
213
+        inter1 = self.interfaces.create(i_name1, lg_lib.DEVICE_TYPE_PHYS,
214
+                                        device1)
215
+        inter2 = self.interfaces.create(i_name2, lg_lib.DEVICE_TYPE_PHYS,
216
+                                        device2)
217
+
218
+        return inter1, inter2
219
+
220
+    def _get_pipe_id(self):
221
+        pipe_id = self.next_pipe_id
222
+        self.next_pipe_id += 2
223
+        return pipe_id
224
+
225
+    def create_pipe_ports(self, bridge):
226
+        if bridge.pipe_id is not None:
227
+            pipe_id = bridge.pipe_id
251 228
         else:
252
-            for vhost_id in range(self.num_vhost):
253
-                if vhost_id not in self.used_vhost_id:
254
-                    sock_path = "/tmp/sock%d" % vhost_id
255
-                    name = "vhost_%d" % vhost_id
256
-                    break
257
-        self.used_vhost_id.append(vhost_id)
258
-        return name, sock_path
259
-
260
-    def free_vhost_id(self, vhost_id):
261
-        if vhost_id in self.used_vhost_id:
262
-            self.used_vhost_id.remove(vhost_id)
263
-
264
-    def port_to_vhost_id(self, port_id):
265
-        ports = self.lagopus_client.show_ports()
266
-        for port in ports:
267
-            if port["name"] == port_id:
268
-                interface = port["interface"]
269
-                if interface.startswith("vhost_"):
270
-                    vhost_id = int(interface[len("vhost_"):])
271
-                    return vhost_id
272
-                return
229
+            pipe_id = self._get_pipe_id()
273 230
 
274
-    def get_pipe(self):
275
-        name0 = "pipe-%d" % self.num_pipe
276
-        name1 = "pipe-%d" % (self.num_pipe + 1)
277
-        device0 = "eth_pipe%d" % self.num_pipe
278
-        device1 = "eth_pipe%d,attach=%s" % (self.num_pipe + 1, device0)
279
-        self.num_pipe += 2
280
-
281
-        self.lagopus_client.create_pipe_interface(name0, device0)
282
-        self.lagopus_client.create_pipe_interface(name1, device1)
283
-
284
-        return name0, name1
285
-
286
-    def get_all_devices(self):
287
-        devices = set()
288
-        ports = self.lagopus_client.show_ports()
289
-        for port in ports:
290
-            devices.add(port["name"])
291
-        LOG.debug("get_all_devices: %s", devices)
292
-        return devices
293
-
294
-    def _create_channel(self, channel):
295
-        data = self.lagopus_client.show_channels()
296
-        names = [d['name'] for d in data]
297
-        if channel not in names:
298
-            self.lagopus_client.create_channel(channel)
299
-
300
-    def _create_controller(self, controller, channel):
301
-        data = self.lagopus_client.show_controllers()
302
-        names = [d['name'] for d in data]
303
-        if controller not in names:
304
-            self.lagopus_client.create_controller(controller, channel)
305
-
306
-    def _create_bridge(self, brname, controller, dpid):
307
-        data = self.lagopus_client.show_bridges()
308
-        names = [d['name'] for d in data]
309
-        if brname not in names:
310
-            self.lagopus_client.create_bridge(brname, controller, dpid)
231
+        inter1, inter2 = self.create_pipe_interfaces(pipe_id)
232
+
233
+        p_name1 = self.ports.mk_name(lg_lib.INTERFACE_TYPE_PIPE, inter1.name)
234
+        p_name2 = self.ports.mk_name(lg_lib.INTERFACE_TYPE_PIPE, inter2.name)
235
+        port1 = self.ports.create(p_name1, inter1)
236
+        port2 = self.ports.create(p_name2, inter2)
237
+
238
+        return port1, port2
239
+
240
+    def create_bridge(self, b_name, dpid):
241
+        channel = self.channels.mk_name(b_name)
242
+        self.channels.create(channel)
243
+        controller = self.controllers.mk_name(b_name)
244
+        self.controllers.create(controller, channel)
245
+        bridge = self.bridges.create(b_name, self.ryu_app, controller, dpid)
246
+        return bridge
247
+
248
+    def bridge_add_port(self, bridge, port):
249
+        if port.bridge is None:
250
+            ofport = bridge.get_ofport()
251
+            bridge.bridge_add_port(port, ofport)
252
+
253
+    def bridge_del_port(self, port):
254
+        if port and port.bridge:
255
+            bridge = port.bridge
256
+            bridge.bridge_del_port(port)
311 257
 
312 258
     def get_bridge(self, segment):
313
-        vlan_id = (segment['segmentation_id']
314
-                   if segment['network_type'] == constants.TYPE_VLAN
315
-                   else 0)
316 259
         phys_net = segment['physical_network']
317
-        if phys_net not in self.phys_to_dpid:
318
-            # Error
319
-            return
320
-        dpid = (vlan_id << 48) | self.phys_to_dpid[phys_net]
321
-        LOG.debug("vlan_id %d phys dpid %d", vlan_id,
322
-                  self.phys_to_dpid[phys_net])
323
-        LOG.debug("dpid %d 0x%x", dpid, dpid)
324
-        if dpid in self.bridges:
325
-            return self.bridges[dpid]
326
-
327
-        # bridge for vlan physical_network does not exist.
328
-        # so create the bridge.
329
-        brname = "%s_%d" % (phys_net, vlan_id)
330
-        channel = "ch-%s" % brname
331
-        self._create_channel(channel)
332
-        controller = "con-%s" % brname
333
-        self._create_controller(controller, channel)
334
-        self._create_bridge(brname, controller, dpid)
335
-
336
-        bridge = LagopusBridge(self.ryu_app, brname, dpid, None)
337
-        self.bridges[dpid] = bridge
338
-
339
-        pipe1, pipe2 = self.get_pipe()
340
-        port1 = "p-%s" % pipe1
341
-        port2 = "p-%s" % pipe2
342
-        self.lagopus_client.create_port(port1, pipe1)
343
-        self.lagopus_client.create_port(port2, pipe2)
344
-
345
-        phys_bridge = self.bridges[self.phys_to_dpid[phys_net]]
346
-        bridge.add_port(port1)
347
-        phys_bridge.add_port(port2)
260
+        phys_bridge = self.phys_to_bridge.get(phys_net)
261
+        if phys_bridge is None:
262
+            # basically this can't be happen since neutron-server
263
+            # already checked before issuing RPC.
264
+            raise ValueError("%s is not configured." % phys_net)
265
+
266
+        if (segment['network_type'] == constants.TYPE_FLAT):
267
+            return phys_bridge
268
+
269
+        vlan_id = segment['segmentation_id']
270
+        b_name = self.bridges.mk_name(phys_net, vlan_id)
271
+        bridge = self.bridges.get(b_name)
272
+        if bridge is None:
273
+            # vlan bridge does not exeist. so create the bridge
274
+            dpid = (vlan_id << 48) | phys_bridge.dpid
275
+            bridge = self.create_bridge(b_name, dpid)
276
+        elif not bridge.is_enabled:
277
+            bridge.enable()
278
+
279
+        # make sure there is pipe connection between phys_bridge
280
+        port1, port2 = self.create_pipe_ports(bridge)
281
+        self.bridge_add_port(bridge, port1)
282
+        self.bridge_add_port(phys_bridge, port2)
348 283
 
349 284
         phys_bridge.install_vlan(vlan_id, port2)
350 285
 
351 286
         return bridge
352 287
 
288
+    def create_vhost_interface(self, vhost_id):
289
+        i_name = self.interfaces.mk_name(lg_lib.INTERFACE_TYPE_VHOST, vhost_id)
290
+        sock_path = self._sock_path(vhost_id)
291
+        device = "eth_vhost%d,iface=%s" % (vhost_id, sock_path)
292
+        interface = self.interfaces.create(i_name, lg_lib.DEVICE_TYPE_PHYS,
293
+                                           device)
294
+        LOG.debug("vhost %d added.", vhost_id)
295
+        os.system("sudo chmod 777 %s" % sock_path)
296
+        return interface
297
+
298
+    def get_vhost_interface(self):
299
+        if self.free_vhost_interfaces:
300
+            return self.free_vhost_interfaces.pop()
301
+
302
+        # create new vhost interface
303
+        vhost_id = self.num_vhost
304
+        interface = self.create_vhost_interface(vhost_id)
305
+        self.num_vhost += 1
306
+        return interface
307
+
308
+    def create_vhost_port(self, p_name):
309
+        if p_name not in self.ports:
310
+            interface = self.get_vhost_interface()
311
+            self.ports.create(p_name, interface)
312
+        return self.ports[p_name]
313
+
353 314
     @log_helpers.log_method_call
354 315
     def plug_vhost(self, context, **kwargs):
355
-        port_id = kwargs['port_id']
316
+        p_name = self.ports.mk_name(lg_lib.INTERFACE_TYPE_VHOST,
317
+                                    kwargs['port_id'])
356 318
         segment = kwargs['segment']
357 319
 
358
-        bridge = self.get_bridge(segment)
359
-        if not bridge:
360
-            # raise
361
-            return
320
+        with self.serializer:
321
+            port = self.create_vhost_port(p_name)
322
+            bridge = self.get_bridge(segment)
323
+            self.bridge_add_port(bridge, port)
362 324
 
363
-        interface_name, sock_path = self.get_vhost_interface()
364
-        self.lagopus_client.create_port(port_id, interface_name)
365
-        bridge.add_port(port_id)
366
-        return sock_path
325
+            return self._sock_path(port.interface.id)
367 326
 
368 327
     @log_helpers.log_method_call
369 328
     def unplug_vhost(self, context, **kwargs):
370
-        port_id = kwargs['port_id']
371
-        bridge_name, _ = self.lagopus_client.find_bridge_port(port_id)
372
-        if not bridge_name:
373
-            LOG.debug("port %s is already unpluged.", port_id)
374
-            return
375
-        self.lagopus_client.bridge_del_port(bridge_name, port_id)
376
-        vhost_id = self.port_to_vhost_id(port_id)
377
-        self.lagopus_client.destroy_port(port_id)
378
-        if vhost_id:
379
-            self.free_vhost_id(vhost_id)
329
+        p_name = self.ports.mk_name(lg_lib.INTERFACE_TYPE_VHOST,
330
+                                    kwargs['port_id'])
331
+
332
+        with self.serializer:
333
+            port = self.ports.get(p_name)
334
+            if port:
335
+                self.bridge_del_port(port)
336
+                interface = port.interface
337
+                self.ports.destroy(p_name)
338
+                self.free_vhost_interfaces.append(interface)
380 339
 
381 340
     @log_helpers.log_method_call
382 341
     def plug_rawsock(self, context, **kwargs):
383 342
         device = kwargs['device']
384 343
         segment = kwargs['segment']
385
-
386
-        if segment is None:
387
-            LOG.debug("no segment. port may not exist.")
388
-            return
389
-
390
-        bridge = self.get_bridge(segment)
391
-        if not bridge:
392
-            return
393
-
394
-        interface_name = 'i' + device
395
-        port_name = 'p' + device
396
-        self.lagopus_client.create_rawsock_interface(interface_name, device)
397
-        self.lagopus_client.create_port(port_name, interface_name)
398
-        bridge.add_port(port_name)
399
-
400
-        return True
344
+        i_name = self.interfaces.mk_name(lg_lib.INTERFACE_TYPE_RAWSOCK, device)
345
+        p_name = self.ports.mk_name(lg_lib.INTERFACE_TYPE_RAWSOCK, device)
346
+
347
+        with self.serializer:
348
+            if not ip_lib.device_exists(device):
349
+                raise RuntimeError("interface %s does not exist.", device)
350
+            interface = self.interfaces.create(i_name,
351
+                                               lg_lib.DEVICE_TYPE_RAWSOCK,
352
+                                               device)
353
+            port = self.ports.create(p_name, interface)
354
+            bridge = self.get_bridge(segment)
355
+            self.bridge_add_port(bridge, port)
401 356
 
402 357
     @log_helpers.log_method_call
403 358
     def unplug_rawsock(self, context, **kwargs):
404 359
         device = kwargs['device']
405
-        interface_name = 'i' + device
406
-        port_name = 'p' + device
407
-
408
-        bridge_name, _ = self.lagopus_client.find_bridge_port(port_name)
409
-        if not bridge_name:
410
-            LOG.debug("device %s is already unpluged.", device)
411
-            return
360
+        i_name = self.interfaces.mk_name(lg_lib.INTERFACE_TYPE_RAWSOCK, device)
361
+        p_name = self.ports.mk_name(lg_lib.INTERFACE_TYPE_RAWSOCK, device)
412 362
 
413
-        self.lagopus_client.bridge_del_port(bridge_name, port_name)
414
-        self.lagopus_client.destroy_port(port_name)
415
-        self.lagopus_client.destroy_interface(interface_name)
363
+        with self.serializer:
364
+            self.bridge_del_port(self.ports.get(p_name))
365
+            self.ports.destroy(p_name)
366
+            self.interfaces.destroy(i_name)
416 367
 
417 368
 
418 369
 class LagopusAgent(service.Service):
@@ -456,7 +407,7 @@ class LagopusAgent(service.Service):
456 407
 
457 408
     def _report_state(self):
458 409
         try:
459
-            devices = len(self.manager.get_all_devices())
410
+            devices = len(self.manager.ports)
460 411
             self.agent_state['configurations']['devices'] = devices
461 412
             self.state_rpc.report_state(self.context, self.agent_state, True)
462 413
             # we only want to update resource versions on startup

+ 5
- 4
networking_lagopus/ml2/mech_driver/mech_lagopus.py View File

@@ -67,16 +67,17 @@ class LagopusMechanismDriver(mech_agent.SimpleAgentMechanismDriverBase):
67 67
 
68 68
     @log_helpers.log_method_call
69 69
     def update_port_postcommit(self, context):
70
-        if (context.original_host
71
-                and context.original_vif_type == 'vhostuser'
72
-                and not context.host and context.vif_type == 'unbound'):
70
+        if (context.original_host and not context.host
71
+                and context.original_vif_type in ('vhostuser',
72
+                                                  'binding_failed')):
73 73
             self.lagopus_api.unplug_vhost(self.context,
74 74
                                           context.current['id'],
75 75
                                           context.original_host)
76 76
 
77 77
     @log_helpers.log_method_call
78 78
     def delete_port_postcommit(self, context):
79
-        if context.host and context.vif_type == 'vhostuser':
79
+        if (context.host
80
+                and context.vif_type in ('vhostuser', 'binding_failed')):
80 81
             self.lagopus_api.unplug_vhost(self.context,
81 82
                                           context.current['id'],
82 83
                                           context.host)

+ 0
- 0
networking_lagopus/tests/unit/__init__.py View File


+ 0
- 0
networking_lagopus/tests/unit/agent/__init__.py View File


+ 262
- 89
networking_lagopus/tests/unit/agent/test_lagopus_lib.py View File

@@ -11,122 +11,295 @@
11 11
 # under the License.
12 12
 
13 13
 import mock
14
+from oslo_utils import uuidutils
15
+from ryu.app.ofctl import api as ofctl_api
14 16
 
15 17
 from neutron.tests import base
16 18
 
17 19
 from networking_lagopus.agent import lagopus_lib
20
+from networking_lagopus.agent import lagosh
18 21
 
22
+# TODO(oda): delete unit tests temporarily since the code is heavily refacored.
23
+# make unit tests later.
19 24
 
20
-class TestLagopusLib(base.BaseTestCase):
25
+
26
+class TestLagopusResource(base.BaseTestCase):
27
+
28
+    class FakeResource(lagopus_lib.LagopusResource):
29
+        resource = "test"
21 30
 
22 31
     def setUp(self):
23
-        super(TestLagopusLib, self).setUp()
24
-        self.lagosh = mock.patch.object(lagopus_lib.LagopusCommand, "_lagosh",
32
+        super(TestLagopusResource, self).setUp()
33
+        self.lagosh = mock.patch.object(lagosh.ds_client, "call",
25 34
                                         return_value=None).start()
26
-        self.lagopus_client = lagopus_lib.LagopusCommand()
35
+        self.test_resource = self.FakeResource("test_resource")
27 36
 
28
-    def test_show_interfaces(self):
29
-        expected_cmd = "interface\n"
30
-        self.lagopus_client.show_interfaces()
31
-        self.lagosh.assert_called_with(expected_cmd)
37
+    def test_create_param_str(self):
38
+        self.assertEqual("", self.test_resource.create_param_str())
32 39
 
33
-    def test_show_ports(self):
34
-        expected_cmd = "port\n"
35
-        self.lagopus_client.show_ports()
36
-        self.lagosh.assert_called_with(expected_cmd)
40
+    def test_create_str(self):
41
+        self.assertEqual("test test_resource create\n",
42
+                         self.test_resource.create_str())
37 43
 
38
-    def test_show_bridges(self):
39
-        expected_cmd = "bridge\n"
40
-        self.lagopus_client.show_bridges()
44
+    def test__exec(self):
45
+        cmd = "test test_resource create\n"
46
+        self.test_resource._exec(cmd)
47
+        self.lagosh.assert_called_with(cmd)
48
+
49
+    def test_create(self):
50
+        expected_cmd = "test test_resource create\n"
51
+        self.test_resource.create()
41 52
         self.lagosh.assert_called_with(expected_cmd)
42 53
 
43
-    def test_show_channels(self):
44
-        expected_cmd = "channel\n"
45
-        self.lagopus_client.show_channels()
54
+    def test_destroy(self):
55
+        expected_cmd = "test test_resource destroy\n"
56
+        self.test_resource.destroy()
46 57
         self.lagosh.assert_called_with(expected_cmd)
47 58
 
48
-    def test_create_controller(self):
49
-        name = "test-controller"
50
-        channel = "test-channel"
51
-        expected_cmd = ("controller %s create -channel %s -role equal "
52
-                        "-connection-type main\n") % (name, channel)
53
-        self.lagopus_client.create_controller(name, channel)
59
+    def test_show(self):
60
+        expected_cmd = "test\n"
61
+        self.FakeResource.show()
54 62
         self.lagosh.assert_called_with(expected_cmd)
55 63
 
56
-    def test_create_bridge(self):
57
-        name = "test-bridge"
64
+    def test_mk_name(self):
65
+        self.assertEqual("unknown", self.FakeResource.mk_name())
66
+
67
+
68
+class TestLagopusChannel(base.BaseTestCase):
69
+
70
+    def setUp(self):
71
+        super(TestLagopusChannel, self).setUp()
72
+        self.lagosh = mock.patch.object(lagosh.ds_client, "call",
73
+                                        return_value=None).start()
74
+        self.test_controller = lagopus_lib.LagopusController("controller",
75
+                                                             "channel")
76
+
77
+    def test_create_param_str(self):
78
+        expected_result = "-channel channel -role equal -connection-type main"
79
+        self.assertEqual(expected_result,
80
+                         self.test_controller.create_param_str())
81
+
82
+    def test_mk_name(self):
83
+        bridge = "test-bridge"
84
+        expected_result = "con-%s" % bridge
85
+        result = lagopus_lib.LagopusController("controller",
86
+                                               "channel").mk_name(bridge)
87
+        self.assertEqual(expected_result, result)
88
+
89
+
90
+class TestLagopusInterface(base.BaseTestCase):
91
+
92
+    def setUp(self):
93
+        super(TestLagopusInterface, self).setUp()
94
+        self.lagosh = mock.patch.object(lagosh.ds_client, "call",
95
+                                        return_value=None).start()
96
+        name = "interface"
97
+        self.dev_type = lagopus_lib.DEVICE_TYPE_PHYS
98
+        self.device = "eth_vhost1,iface=/tmp/sock1"
99
+        self.test_interface = lagopus_lib.LagopusInterface(name, self.dev_type,
100
+                                                           self.device)
101
+
102
+    def test__get_interface_type(self):
103
+        # vhost
104
+        self.assertEqual(lagopus_lib.INTERFACE_TYPE_VHOST,
105
+                         self.test_interface._get_interface_type())
106
+
107
+        # pipe
108
+        self.test_interface.device = "eth_pipe1"
109
+        self.assertEqual(lagopus_lib.INTERFACE_TYPE_PIPE,
110
+                         self.test_interface._get_interface_type())
111
+
112
+        # Physical Interface
113
+        self.test_interface.device = ""
114
+        self.assertEqual(lagopus_lib.INTERFACE_TYPE_PHYS,
115
+                         self.test_interface._get_interface_type())
116
+
117
+        # raw socket
118
+        self.test_interface.dev_type = lagopus_lib.DEVICE_TYPE_RAWSOCK
119
+        self.assertEqual(lagopus_lib.INTERFACE_TYPE_RAWSOCK,
120
+                         self.test_interface._get_interface_type())
121
+
122
+    def test__get_id_for_type(self):
123
+        # vhost
124
+        self.assertEqual(1, self.test_interface._get_id_for_type())
125
+
126
+        # pipe
127
+        self.test_interface.type = lagopus_lib.INTERFACE_TYPE_PIPE
128
+        self.test_interface.device = "eth_pipe2,attach=eth_pipe1"
129
+        self.assertEqual(2, self.test_interface._get_id_for_type())
130
+
131
+    def test_create_param_str(self):
132
+        # vhost
133
+        expected_result = "-type %s -device %s" % (self.dev_type, self.device)
134
+        self.assertEqual(expected_result,
135
+                         self.test_interface.create_param_str())
136
+
137
+        # Physical Interface
138
+        self.test_interface.type = lagopus_lib.INTERFACE_TYPE_PHYS
139
+        expected_result = "-type %s -port-number 0" % self.dev_type
140
+        self.assertEqual(expected_result,
141
+                         self.test_interface.create_param_str())
142
+
143
+    def test_mk_name(self):
144
+        # vhost
145
+        expected_result = "vhost_1"
146
+        self.assertEqual(
147
+            expected_result,
148
+            self.test_interface.mk_name(lagopus_lib.INTERFACE_TYPE_VHOST, 1))
149
+
150
+        # pipe
151
+        expected_result = "pipe-1"
152
+        self.assertEqual(
153
+            expected_result,
154
+            self.test_interface.mk_name(lagopus_lib.INTERFACE_TYPE_PIPE, 1))
155
+
156
+        # raw socket
157
+        expected_result = "i1"
158
+        self.assertEqual(
159
+            expected_result,
160
+            self.test_interface.mk_name(lagopus_lib.INTERFACE_TYPE_RAWSOCK, 1))
161
+
162
+
163
+class TestLagopusPort(base.BaseTestCase):
164
+
165
+    def setUp(self):
166
+        super(TestLagopusPort, self).setUp()
167
+        self.lagosh = mock.patch.object(lagosh.ds_client, "call",
168
+                                        return_value=None).start()
169
+        name = "port"
170
+        dev_type = lagopus_lib.DEVICE_TYPE_PHYS
171
+        device = "eth_vhost1,iface=/tmp/sock1"
172
+        interface_name = "test-interface"
173
+        self.test_interface = lagopus_lib.LagopusInterface(interface_name,
174
+                                                           dev_type,
175
+                                                           device)
176
+        self.test_port = lagopus_lib.LagopusPort(name, self.test_interface)
177
+
178
+    def test_create_param_str(self):
179
+        self.assertEqual("-interface test-interface",
180
+                         self.test_port.create_param_str())
181
+
182
+    def test_add_bridge_str(self):
183
+        self.assertIsNone(self.test_port.add_bridge_str())
184
+        self.test_port.bridge = mock.Mock()
185
+        self.test_port.bridge.name = "test-bridge"
186
+        self.test_port.ofport = "1"
187
+        self.assertEqual("bridge test-bridge config -port port 1\n",
188
+                         self.test_port.add_bridge_str())
189
+
190
+    def test_create(self):
191
+        self.test_port.interface.is_used = False
192
+        self.test_port.create()
193
+        self.assertTrue(self.test_port.interface.is_used)
194
+
195
+    def test_destroy(self):
196
+        self.test_port.interface.is_used = True
197
+        self.test_port.destroy()
198
+        self.assertFalse(self.test_port.interface.is_used)
199
+
200
+    def test_mk_name(self):
201
+        # vhost
202
+        port_id = uuidutils.generate_uuid()
203
+        self.assertEqual(
204
+            port_id,
205
+            self.test_port.mk_name(lagopus_lib.INTERFACE_TYPE_VHOST, port_id))
206
+
207
+        # pipe
208
+        pipe_interface = "pipe-1"
209
+        self.assertEqual(
210
+            "p-" + pipe_interface,
211
+            self.test_port.mk_name(lagopus_lib.INTERFACE_TYPE_PIPE,
212
+                                   pipe_interface))
213
+
214
+        # raw socket
215
+        device = "taptest"
216
+        self.assertEqual(
217
+            "p" + device,
218
+            self.test_port.mk_name(lagopus_lib.INTERFACE_TYPE_RAWSOCK,
219
+                                   device))
220
+
221
+
222
+class TestLagopusBridge(base.BaseTestCase):
223
+
224
+    def setUp(self):
225
+        super(TestLagopusBridge, self).setUp()
226
+        self.lagosh = mock.patch.object(lagosh.ds_client, "call",
227
+                                        return_value=None).start()
228
+        self.ofctl = mock.patch.object(ofctl_api, "send_msg",
229
+                                       return_value=None).start()
230
+        self.ofctl_get_datapath = mock.patch.object(
231
+            ofctl_api,
232
+            "get_datapath",
233
+            return_value=mock.Mock()).start()
234
+        name = "bridge"
235
+        ryu_app = mock.Mock()
58 236
         controller = "test-controller"
59 237
         dpid = 1
60
-        expected_cmd_bridge_create = ("bridge %s create -controller %s "
61
-                                      "-dpid %d -l2-bridge True "
62
-                                      "-mactable-ageing-time 300 "
63
-                                      "-mactable-max-entries "
64
-                                      "8192\n") % (name, controller, dpid)
65
-        expected_cmd_bridge_enable = "bridge %s enable\n" % name
66
-        self.lagopus_client.create_bridge(name, controller, dpid)
67
-        expected_calls = [mock.call(expected_cmd_bridge_create),
68
-                          mock.call(expected_cmd_bridge_enable)]
69
-        self.lagosh.assert_has_calls(expected_calls)
70
-
71
-    def test_create_vhost_interface(self):
72
-        name = "test-interface"
73
-        device = "test-device"
74
-        expected_cmd = ("interface %s create -type ethernet-dpdk-phy "
75
-                        "-device %s\n") % (name, device)
76
-        self.lagopus_client.create_vhost_interface(name, device)
77
-        self.lagosh.assert_called_with(expected_cmd)
238
+        self.test_bridge = lagopus_lib.LagopusBridge(name, ryu_app,
239
+                                                     controller, dpid,
240
+                                                     is_enabled=True)
78 241
 
79
-    def test_create_pipe_interface(self):
80
-        name = "test-interface"
81
-        device = "test-device"
82
-        expected_cmd = ("interface %s create -type ethernet-dpdk-phy "
83
-                        "-device %s\n") % (name, device)
84
-        self.lagopus_client.create_pipe_interface(name, device)
85
-        self.lagosh.assert_called_with(expected_cmd)
242
+    def test_get_ofport(self):
243
+        self.test_bridge.max_ofport = 0
244
+        self.assertEqual(1, self.test_bridge.get_ofport())
245
+        self.test_bridge.max_ofport = lagopus_lib.OFPP_MAX + 1
246
+        self.test_bridge.used_ofport = [1, 3]
247
+        self.assertEqual(2, self.test_bridge.get_ofport())
86 248
 
87
-    def test_create_rawsock_interface(self):
88
-        name = "test-interface"
89
-        device = "test-device"
90
-        expected_cmd = ("interface %s create -type ethernet-rawsock "
91
-                        "-device %s\n") % (name, device)
92
-        self.lagopus_client.create_rawsock_interface(name, device)
93
-        self.lagosh.assert_called_with(expected_cmd)
249
+    def test_add_port(self):
250
+        port = mock.Mock()
251
+        ofport = 11
252
+        self.test_bridge.max_ofport = 10
253
+        port.interface.type = lagopus_lib.INTERFACE_TYPE_PIPE
254
+        port.interface.id = 1
255
+        self.test_bridge.add_port(port, ofport)
256
+        self.assertEqual(ofport, self.test_bridge.max_ofport)
257
+        self.assertEqual(1, self.test_bridge.pipe_id)
94 258
 
95
-    def test_create_port(self):
96
-        port = "test-port"
97
-        interface = "test-interface"
98
-        expected_cmd = "port %s create -interface %s\n" % (port, interface)
99
-        self.lagopus_client.create_port(port, interface)
100
-        self.lagosh.assert_called_with(expected_cmd)
259
+    def test_del_port(self):
260
+        port = mock.Mock()
261
+        port.bridge = self.test_bridge
262
+        port.ofport = 10
263
+        self.test_bridge.used_ofport = [1, 2, 10]
264
+        self.test_bridge.del_port(port)
265
+        self.assertNotIn(10, self.test_bridge.used_ofport)
266
+        self.assertIsNone(port.bridge)
267
+        self.assertIsNone(port.ofport)
101 268
 
102
-    def test_destroy_port(self):
103
-        port = "test-port"
104
-        expected_cmd = "port %s destroy\n" % port
105
-        self.lagopus_client.destroy_port(port)
106
-        self.lagosh.assert_called_with(expected_cmd)
269
+    def test_create_param_str(self):
270
+        self.assertEqual("-controller test-controller -dpid 1 "
271
+                         "-l2-bridge True -mactable-ageing-time 300 "
272
+                         "-mactable-max-entries 8192",
273
+                         self.test_bridge.create_param_str())
107 274
 
108
-    def test_destroy_interface(self):
109
-        interface = "test-interface"
110
-        expected_cmd = "interface %s destroy\n" % interface
111
-        self.lagopus_client.destroy_interface(interface)
112
-        self.lagosh.assert_called_with(expected_cmd)
275
+    def test_enable_str(self):
276
+        self.assertEqual("bridge bridge enable\n",
277
+                         self.test_bridge.enable_str())
113 278
 
114 279
     def test_bridge_add_port(self):
115
-        bridge_name = "test-bridge"
116
-        port_name = "test-port"
280
+        port = mock.Mock()
281
+        port.name = "port"
117 282
         ofport = 1
118
-        expected_cmd = ("bridge %s config -port %s %d\n" %
119
-                        (bridge_name, port_name, ofport))
120
-        self.lagopus_client.bridge_add_port(bridge_name,
121
-                                            port_name,
122
-                                            ofport)
123
-        self.lagosh.assert_called_with(expected_cmd)
283
+        expected_cmd = "bridge bridge config -port port 1\n"
284
+        with mock.patch.object(lagopus_lib.LagopusBridge,
285
+                               "add_port") as f:
286
+            self.test_bridge.bridge_add_port(port, ofport)
287
+            self.lagosh.assert_called_with(expected_cmd)
288
+            f.assert_called_with(port, ofport)
124 289
 
125 290
     def test_bridge_del_port(self):
126
-        bridge_name = "test-bridge"
127
-        port_name = "test-port"
128
-        expected_cmd = ("bridge %s config -port -%s\n" %
129
-                        (bridge_name, port_name))
130
-        self.lagopus_client.bridge_del_port(bridge_name,
131
-                                            port_name)
132
-        self.lagosh.assert_called_with(expected_cmd)
291
+        port = mock.Mock()
292
+        port.name = "port"
293
+        expected_cmd = "bridge bridge config -port -port\n"
294
+        with mock.patch.object(lagopus_lib.LagopusBridge,
295
+                               "del_port") as f:
296
+            self.test_bridge.bridge_del_port(port)
297
+            self.lagosh.assert_called_with(expected_cmd)
298
+            f.assert_called_with(port)
299
+
300
+    def test_mk_name(self):
301
+        phys_net = "test-physical"
302
+        vlan_id = 1
303
+        self.assertEqual(phys_net + "_" + str(vlan_id),
304
+                         self.test_bridge.mk_name(phys_net,
305
+                                                  vlan_id))

Loading…
Cancel
Save