首页 科技正文

环球国际官网:着手实现一个较为简朴的MQTT服务端和客户端

admin 科技 2020-07-20 52 0

项目地址:https://github.com/hnlyf168/DotNet.Framework

昨天晚上大致测试了下 ,490个客户端(一个收一个发)  平均估量每个每秒60个包  使用mqtt协议  发送一个guid的字符串   服务器转发每秒约莫1.2-1.3w 
cpu  占用:25% 一下
内存似乎都在50m以下

 

 

1、协议简介

  MQTT是一个基于客户端-服务器的新闻公布/订阅传输协议。MQTT协议是轻量、简朴、开放和易于实现的,这些特点使它适用范围异常普遍。在许多情况下,包罗受限的环境中,如:机械与机械(M2M)通讯和物联网(IoT)。其在,通过卫星链路通讯传感器、偶然拨号的医疗装备、智能家居、及一些小型化装备中已普遍使用。

  详细就不在这里记录了,写这个服务端和客户端也只是为了加倍深入的学习mqtt协议。

 

2、mqtt的几种控制报文类型

名字

报文流动偏向

形貌

Reserved

0

克制

保留

CONNECT

1

客户端到服务端

客户端请求毗邻服务端

CONNACK

2

服务端到客户端

毗邻报文确认

PUBLISH

3

两个偏向都允许

公布新闻

PUBACK

4

两个偏向都允许

QoS       1新闻公布收到确认

PUBREC

5

两个偏向都允许

公布收到(保证交付第一步)

PUBREL

6

两个偏向都允许

公布释放(保证交付第二步)

PUBCOMP

7

两个偏向都允许

QoS       2新闻公布完成(保证交互第三步)

SUBSCRIBE

8

客户端到服务端

客户端订阅请求

SUBACK

9

服务端到客户端

订阅请求报文确认

UNSUBSCRIBE

10

客户端到服务端

客户端作废订阅请求

UNSUBACK

11

服务端到客户端

作废订阅报文确认

PINGREQ

12

客户端到服务端

心跳请求

PINGRESP

13

服务端到客户端

心跳响应

DISCONNECT

14

客户端到服务端

客户端断开毗邻

Reserved

15

克制

保留

 

 

2.1、协议头

    每个MQTT控制报文都包罗一个牢固报头,

  

      牢固报头的花样

Bit

7

6

5

4

3

2

1

0

byte       1

MQTT控制报文的类型

用于指定控制报文类型的标志位

byte       2...

 

剩余长度

 

 

 

     剩余长度的盘算方式:

剩余长度(Remaining      Length)示意当前报文剩余部门的字节数,包罗可变报头和负载的数据。剩余长度不包罗用于编码剩余长度字段自己的字节数。

剩余长度字段使用一个变长度编码方案,对小于128的值它使用单字节编码。更大的值按下面的方式处置。低7位有用位用于编码数据,最高有用位用于指示是否有更多的字节。因此每个字节可以编码128个数值和一个延续位(continuation     bit)。剩余长度字段最大4个字节

例如,十进制数64会被编码为一个字节,数值是64,十六进制示意为0x40,。十进制数字

321(=65+2*128)被编码为两个字节,最低有用位在前。第一个字节是    65+128=193。注重最高位为1示意后面至少另有一个字节。第二个字节是2。

.net 盘算方式代码如下:

/// <summary>
        /// 获取一个长度数据
        /// </summary>
        /// <returns></returns>
        protected virtual Result<int> ReadLength()
        {
            var result = this.Socket.ReceiveBytes(1);
            if (!result.Success)
            {
                WriteLog("获取mqtt 长度失败");
                return new Result<int>(result);
            }
            var msgType = result.Data[0];
            var msgLength = msgType & 127;//取低7为的值,由于可变长度有用值只有低7位,第8位用来标识下一个字节是否属于长度字节
            var leftBit = 7;
            while (msgType >> 7 == 1)//判断最高位是否为1,若是为1则说明后面的1个字节也是属于长度字节
            {
                result = this.Socket.ReceiveBytes(1);
                if (!result.Success)
                {
                    WriteLog("获取mqtt 长度失败");
                    return new Result<int>(result);
                }
                msgType = result.Data[0];
                msgLength = ((msgType & 127) << leftBit) | msgLength;// 由于mqtt 可变长度的字节是低位在前,以是新取到的长度要左移取到的次数*7位在|原来的长度。
                leftBit += 7;
            }
            return msgLength;
        }

2.2、CONNECT   –     毗邻服务端

协议花样

 

可看到所需要的参数,于是界说一个毗邻信息类来保留

/// <summary>
    /// mqtt 毗邻信息。
    /// </summary>
    public class MQTTConnectInfo
    {
        /// <summary>
        /// 客户端编号
        /// </summary>
        public virtual string ClientId { get; set; }
        /// <summary>
        /// 用户名
        /// </summary>
        public virtual string UserName { get; set; }
        /// <summary>
        /// 用户密码
        /// </summary>
        public virtual string Password { get; set; }
        /// <summary>
        /// 遗嘱保留
        /// </summary>
        public virtual bool WillRetain { get; set; }
        /// <summary>
        /// 遗嘱QoS
        /// </summary>
        public virtual Qos WillQos { get; set; }
        /// <summary>
        /// 遗嘱标志 
        /// </summary>
        public virtual bool WillFlag { get; set; }
        /// <summary>
        /// 是否消灭对话。
        /// </summary>
        public virtual bool CleanSession { get; set; }
        /// <summary>
        /// 保持毗邻
        /// <para>忠告:这里的单元是秒</para>
        /// </summary>
        public virtual ushort KeepAlive { get; set; } = 10;
    }
View Code

然后就是代码按协议花样组装好

代码如下:

环球国际官网:着手实现一个较为简朴的MQTT服务端和客户端 第2张
/// <summary>
        /// 获取包完整的字节
        /// </summary>
        /// <returns></returns>
        public override byte[] ToBytes()
        {
            var list = new List<byte>();
            var mqttBytes = ProtocolName.ToBytes(Encoding.ASCII);//协议名称:牢固位MQTT
            list.Add((byte)(mqttBytes.Length >> 8));
            list.Add((byte)(mqttBytes.Length & 255));
            list.AddRange(mqttBytes);

            list.Add(Version);//协议版本
            list.Add(ConnectFlag);//毗邻标识

            list.Add((byte)(KeepAlive >> 8));//心跳值
            list.Add((byte)(KeepAlive & 255));

            var clientIdBytes = ClientId.ToBytes(Encoding.ASCII);//客户端编号
            list.Add((byte)(clientIdBytes.Length >> 8));
            list.Add((byte)(clientIdBytes.Length & 255));
            list.AddRange(clientIdBytes);

            if (HasUserName)//是否包罗用户名
            {
                var userNameBytes = UserName.ToBytes(Encoding.ASCII);
                list.Add((byte)(userNameBytes.Length >> 8));
                list.Add((byte)(userNameBytes.Length & 255));
                list.AddRange(userNameBytes);
            }
            if (HasPassword)//是否包罗用户密码
            {
                var passwordBytes = Password.ToBytes(Encoding.ASCII);
                list.Add((byte)(passwordBytes.Length >> 8));
                list.Add((byte)(passwordBytes.Length & 255));
                list.AddRange(passwordBytes);
            }
            Data = list.ToArray();
            list.Clear();
            return base.ToBytes();
        }
View Code

 

 毗邻回复包花样:

 

 

形貌

7

6

5

4

3

2

1

0

毗邻确认标志

Reserved     保留位

1 SP

 

 

 

 

 

 

 

byte       1

 

0

0

0

0

0

0

0

X

毗邻返回码

 

 

 

 

 

 

 

 

 

byte       2

 

X

X

X

X

X

X

X

X

 

形貌

7

6

5

4

3

2

1

0

毗邻确认标志

 

Reserved      保留位

 

 

1 SP

byte       1

 

0

0

0

0

0

0

0

X

 

 

毗邻返回码

 

 

 

byte       2

 

X

X

X

X

X

X

X

X

,犹豫代码比较简朴这里就不贴了

 2.3、PUBLISH      –     公布新闻

PUBLISH控制报文是指从客户端向服务端或者服务端向客户端传输一个应用新闻

 

 

主题长度 16位  2字节

主题内容N

若是QoS大于0 则有一个新闻Id   16位  2字节

剩余的N 是新闻的主题

 

 

形貌

7

6

5

4

3

2

1

0

Topic     Name    主题名

 

 

 

 

 

 

 

 

 

byte       1

Length   MSB      (0)

0

0

0

0

0

0

0

0

byte       2

Length   LSB       (3)

0

0

0

0

0

0

1

1

byte       3

‘a’    (0x61)

0

1

1

0

0

0

0

1

byte       4

‘/’     (0x2F)

0

0

1

0

1

1

1

1

byte       5

‘b’    (0x62)

0

1

1

0

0

0

1

0

报文标识符

 

 

 

 

 

 

 

 

 

byte       6

报文标识符  MSB       (0)

0

0

0

0

0

0

0

0

byte       7

报文标识符  LSB       (10)

0

0

0

0

1

0

1

0

代码按协议花样组装如下:

/// <summary>
        /// 最先组装包。
        /// </summary>
        protected override void Packaging()
        {
            var topicBytes = Topic.ToBytes();//主题数据
            Data = new byte[topicBytes.Length + BodyBytes.Length + (QoS > 0 ? 4 : 2)];
            Data[0] = (byte)(topicBytes.Length >> 8);
            Data[1] = (byte)(topicBytes.Length & 255);
            topicBytes.CopyTo(Data, 2);
            if (QoS > 0)
            {
                Data[topicBytes.Length + 2] = (byte)(Identifier >> 8);
                Data[topicBytes.Length + 3] = (byte)(Identifier & 255);
            }
            BodyBytes.CopyTo(Data, Data.Length - BodyBytes.Length);//复制新闻内容
            topicBytes = null;
        }

 

 后面的花样我就不在逐一写出来了 ,附上一个mqtt协议文档

MQTT协议中文版

这里另有一个异常重要的自己写的一个Socket 辅助类,主要实现高性能的读取和发送新闻,已整包的形式,这样制止粘包等问题

/// <summary>
    /// Berkeley 套接字 辅助
    /// </summary>
    public abstract class SocketClient<Package>
        where Package : IDataPackage
    {
        private Socket m_Socket;
        private Timer timerHeartbeat;
        private System.Net.EndPoint remoteEndPoint;
        /// <summary>
        /// 客户端唯一标识
        /// </summary>
        public virtual long Id { get; set; }
        /// <summary>
        ///  Berkeley 套接字。
        /// </summary>
        public virtual Socket Socket { get => m_Socket; protected set { m_Socket = value; remoteEndPoint = m_Socket.RemoteEndPoint; } }


        /// <summary>
        /// 客户端的远程信息。
        /// </summary>
        public virtual System.Net.EndPoint RemoteEndPoint { get => remoteEndPoint; }
        /// <summary>
        /// 心跳线程
        /// </summary>
        protected virtual Timer TimerHeartbeat { get => timerHeartbeat; }

        /// <summary>
        /// 心跳时间。
        /// </summary>
        public virtual int KeepAlive { get; set; } = 180000;
        /// <summary>
        /// 初始化
        /// </summary>
        /// <param name="socket"></param>
        protected SocketClient(Socket socket)
        {

            Socket = socket;
        }
        /// <summary>
        /// 初始化
        /// </summary>
        protected SocketClient()
        {

        }
        /// <summary>
        /// 读取一个完整的包。
        /// </summary>
        /// <returns></returns>
        protected abstract DotNet.Result<Package> ReceivePackage();
        /// <summary>
        /// 最先循环读取新闻。
        /// </summary>
        public virtual void OnReceive()
        {
            while (!IsClose)
            {
                try
                {
                    OnHeartbeatTimer();
                    var bytesResult = ReceivePackage();
                    if (bytesResult.Success)
                    {
                        OnNewDataPackage(bytesResult);
                    }
                    else
                    {
                        WriteLog($"吸收包时错误,错误内容:{bytesResult.Message}");
                        if (bytesResult.Code == -1)
                        {
                            this.Close();
                        }
                    }
                }
                catch (Exception ex)
                {
                    WriteErrorLog($"吸收包时异常", ex);
                }
            }
            Close();
        }
        /// <summary>
        /// 当吸收到
        /// </summary>
        /// <param name="bytesResult"></param>
        protected virtual void OnNewDataPackage(Result<Package> bytesResult)
        {
            try
            {
                // 这里使用异步会有一个问题,就是若是一个客户端while(true)在发新闻,会导致服务器线程被一个客户端占满而无法处置其他的客户端。
                OnHandleDataPackage(bytesResult.Data);
            }
            catch (Exception ex)
            {
                WriteErrorLog($"客户端处置包时报错", ex);
            }
        }

#if NET40
        /// <summary>
        /// 启用异步读取
        /// </summary>
        /// <returns></returns>
        public virtual Task OnReceiveAsync()
        {
            return Task.Factory.StartNew(OnReceive);
        }
#else
        /// <summary>
        /// 启用异步读取
        /// </summary>
        /// <returns></returns>
        public virtual async Task OnReceiveAsync()
        {
            await Task.Run(() =>
            {
                OnReceive();
            });
        }
#endif

        private bool m_IsClose;
        /// <summary>
        /// 是否已经关闭
        /// </summary>
        public virtual bool IsClose => m_IsClose;
        /// <summary>
        /// 关闭毗邻,并退出当前线程
        /// </summary>
        public virtual void Close(int timeout = 3)
        {
            lock (this)
            {
                if (!IsClose)
                {
                    m_IsClose = true;
                    WriteLog($"关闭毗邻");

                    OnClose();
                    //真正关闭,制止二次关闭
                }
            }
            Socket?.Close(timeout);
            Socket?.Dispose();
            timerHeartbeat?.Dispose();
        }
        /// <summary>
        /// 关闭毗邻并退出。
        /// </summary>
        protected abstract void OnClose();
        /// <summary>
        /// 设置心跳计数器
        /// </summary>
        protected virtual void OnHeartbeatTimer()
        {
            if (timerHeartbeat == null)
            {
                timerHeartbeat = new Timer(OnHeartbeatTimerCallback, this, KeepAlive, KeepAlive);
            }
            else
            {
                timerHeartbeat.Change(KeepAlive, KeepAlive);
            }
        }
        /// <summary>
        /// 心跳现实到达后触发,改方式又心跳计数器执行。
        /// </summary>
        /// <param name="state"></param>
        protected virtual void OnHeartbeatTimerCallback(object state)
        {
            WriteLog($"客户端{KeepAlive}s未发包,已甩掉");
            Close();
        }
        /// <summary>
        /// 写入日志。
        /// </summary>
        /// <param name="text">日志内容</param>
        public virtual void WriteLog(string text)
        {
          // Log.WriteLog($" 毗邻{RemoteEndPoint}-{text}");
        }
        /// <summary>
        /// 写入错误信息到日志。
        /// </summary>
        /// <param name="text">错误信息形貌</param>
        /// <param name="exception">异常信息</param>
        public virtual void WriteErrorLog(string text, Exception exception = null)
        {
           // Log.WriteErrorLog($" 毗邻{RemoteEndPoint}-{text}", exception);
        }
        /// <summary>
        /// 写入日志。
        /// </summary>
        /// <param name="text">日志内容</param>
        /// <param name="args"></param>
        public virtual void WriteLog(string text, params object[] args)
        {
            WriteLog(string.Format(text, args));
        }
        /// <summary>
        /// 最先处置吸收的包
        /// </summary>
        /// <param name="dataPackage"></param>
        protected abstract void OnHandleDataPackage(Package dataPackage);
        /// <summary>
        /// 发送数据
        /// </summary>
        /// <param name="bytes"></param>
        public virtual Result SendBytes(byte[] bytes)
        {
            lock (this)
            {
                if (!IsClose)
                {
                    try
                    {
                        Socket.Send(bytes);
                        return true;
                    }
                    catch (Exception ex)
                    {
                        WriteErrorLog($"发送数据{bytes.ToBase64String()}", ex);
                        if (!Socket.Connected)
                        {
                            Close();
                        }
                    }

                }
            }
            return false;
        }
    }

 

其mqtt的Socket实现子类如下:

/// <summary>
    /// mqtt 服务器毗邻过来的客户端。
    /// </summary>
    public class MQTTSocketClient : DotNet.Net.SocketClient<MQTTDataPackage>
    {
        /// <summary>
        /// 示意mqtt服务器。
        /// </summary>
        public virtual MQTTServer TcpServer { get; set; }
        /// <summary>
        /// 获取一个值,该值指示客户端是否发送过了毗邻协议包。
        /// </summary>
        public virtual bool IsConnect { get; protected set; }
        private readonly List<TopicDataPackage> subscribeTopics = new List<TopicDataPackage>();
        /// <summary>
        /// 订阅主题。
        /// </summary>
        public TopicDataPackage[] SubscribeTopics { get => subscribeTopics.ToArray(); }
        /// <summary>
        /// 当前新闻序号
        /// </summary>
        public virtual ushort Identifier { get; set; }
        /// <summary>
        /// 客户端毗邻编号
        /// </summary>
        public virtual string ClientId { get; set; }
        /// <summary>
        /// 客户端唯一毗邻id
        /// </summary>
        public override long Id
        {
            get
            {
                if (long.TryParse(ClientId, out long id))
                {
                    base.Id = id;
                }
                else
                {
                    base.Id = ClientId.GetHashCode();
                }
                return base.Id;
            }
            set
            {
                ClientId = value.ToString();
            }
        }
        /// <summary>
        /// 写日志。
        /// </summary>
        /// <param name="text"></param>
        public override void WriteLog(string text)
        {
            if (ClientId != null)
            {
                text = $"客户端编号:{ClientId}:{text}";
            }
            // base.WriteLog(text);
        }
        /// <summary>
        /// 使用<see cref="Socket"/>客户端初始化。
        /// </summary>
        /// <param name="socket"></param>
        public MQTTSocketClient(Socket socket) : base(socket)
        {

        }
        /// <summary>
        /// 关闭服务端毗邻
        /// </summary>
        protected override void OnClose()
        {
            Console.WriteLine($"{ClientId}关闭毗邻");
        }

        /// <summary>
        /// 处置收到的包
        /// </summary>
        /// <param name="dataPackage"></param>
        protected override void OnHandleDataPackage(MQTTDataPackage dataPackage)
        {

            try
            {

                WriteLog($"收到{dataPackage.MessageType} 包,    QoS level:{dataPackage.QoS}");

                if (IsConnect && dataPackage.MessageType != MessageType.Connect)
                {
                    WriteLog($"收到{dataPackage.MessageType} 包,    QoS level:{dataPackage.QoS} ,但毗邻尚未登录,被甩掉");
                    this.Close();

                }

                switch (dataPackage.MessageType)
                {
                    case MessageType.Connect:
                        OnConnect(dataPackage);
                        break;
                    case MessageType.Subscribe:
                        OnSubscribe(dataPackage);
                        break;
                    case MessageType.PingRequest:
                        OnPingRequest(dataPackage);
                        break;
                    case MessageType.Publish:
                        OnPublishPackage(dataPackage);
                        break;
                    case MessageType.UnSubscribe:
                        OnUnSubscribe(dataPackage);
                        break;
                    case MessageType.Disconnect:
                        this.Close();
                        break;
                }
            }
            catch (Exception ex)
            {

            }
            dataPackage = null;

        }
#if NET40
        /// <summary>
        /// 当收到公布新闻
        /// </summary>
        /// <param name="dataPackage"></param>
        protected virtual Task OnPublishPackage(MQTTDataPackage dataPackage)
        {
            return Task.Factory.StartNew(() =>
            {
#else
        /// <summary>
        /// 当收到公布新闻
        /// </summary>
        /// <param name="dataPackage"></param>
        /// <returns></returns>
        protected virtual async Task OnPublishPackage(MQTTDataPackage dataPackage)
        {
            await Task.Run(() =>
            {
#endif
                try
                {
                    PublishDataPackage publishDataPackage = new PublishDataPackage(dataPackage);
                    var result = OnPublish(publishDataPackage);
                    if (dataPackage.QoS > 0)
                    {
                        var package = new MQTTDataPackage() { MessageType = MessageType.PublishAck, Data = new byte[3] { (byte)(publishDataPackage.Identifier >> 8), (byte)(publishDataPackage.Identifier & 255), 0 } };
                        if (dataPackage.QoS == 1)
                        {
                            if (!result.Success)
                            {
                                package.Data[2] = 1;
                            }
                            //SendPackage(package);
                        }
                    }
                }
                catch (Exception ex)
                {

                }
            });
        }
        /// <summary>
        /// 当客户公布新闻。
        /// </summary>
        /// <param name="message"></param>
        /// <returns></returns>
        protected virtual Result OnPublish(PublishDataPackage message)
        {
            WriteLog($"客户端{message.ClientId}公布新闻{message.Topic},QoS{message.QoS}。内容:{message.Text}");
            try
            {
                foreach (var client in TcpServer.Clients)
                {
                    foreach (var topic in client.SubscribeTopics)
                    {
                        if (MqttTopicFilterComparer.IsMatch(message.Topic, topic.Topic))
                        {
                            var temp = message.Clone();
                            temp.QoS = 0;// Math.Min(message.QoS, topic.QoS);//mqtt协议划定,取订阅主题和发送主题中最小的qos值。
                            client.Publish(temp);
                        }
                    }
                }
            }
            catch (Exception ex)
            {

            }
            return true;
        }
        /// <summary>
        /// 公布新闻。
        /// </summary>
        /// <param name="message">要公布的新闻。</param>
        /// <returns></returns>
        public virtual Result Publish(PublishDataPackage message)
        {
            message.Identifier = ++Identifier;
            this.SendPackage(message);//现在不校验,qos 直接发送
            return true;
        }
        /// <summary>
        /// 当客户端发送了ping 请求
        /// </summary>
        /// <param name="dataPackage"></param>
        protected virtual void OnPingRequest(MQTTDataPackage dataPackage)
        {
            var package = new MQTTDataPackage() { MessageType = MessageType.PingResponse };
            SendPackage(package);
        }
        /// <summary>
        /// 发生订阅新闻
        /// </summary>
        /// <param name="dataPackage"></param>
        private void OnSubscribe(MQTTDataPackage dataPackage)
        {

            TopicDataPackage topicDataPackage = new TopicDataPackage(dataPackage);
            var result = OnSubscribe(topicDataPackage);
            var package = new SubscribeAckDataPackage() { Identifier = topicDataPackage.Identifier, Success = result.Success };
            if (result.Success)
            {
                if (!subscribeTopics.Contains(topicDataPackage))
                {
                    subscribeTopics.Add(topicDataPackage);
                }
                package.ValidQos = Qos.QoS2;//
            }
            SendPackage(package);
        }
        /// <summary>
        /// 作废订阅新闻
        /// </summary>
        /// <param name="dataPackage"></param>
        private void OnUnSubscribe(MQTTDataPackage dataPackage)
        {
            TopicDataPackage topicDataPackage = new TopicDataPackage(dataPackage);
            var result = OnUnSubscribe(topicDataPackage);
            if (result.Success)
            {
                if (subscribeTopics.Contains(topicDataPackage))
                {
                    subscribeTopics.Remove(topicDataPackage);
                }
                var package = new IdentifierAckDataPackage(MessageType.UnSubscribeAck) { Identifier = topicDataPackage.Identifier };
                SendPackage(package);
            }
        }
        /// <summary>
        /// 当收到 作废订阅主题新闻时。
        /// </summary>
        /// <param name="message"></param>
        /// <returns></returns>
        protected virtual Result OnUnSubscribe(TopicDataPackage message)
        {
            WriteLog($"客户端{message.ClientId} 作废订阅{message.Topic},QoS{message.QoS}");
            return true;
        }
        /// <summary>
        /// 当收到订阅主题新闻时。
        /// </summary>
        /// <param name="message"></param>
        /// <returns></returns>
        protected virtual Result OnSubscribe(TopicDataPackage message)
        {
            WriteLog($"客户端{message.ClientId}订阅{message.Topic},QoS{message.RequestedQoS}");
            return true;
        }
        /// <summary>
        /// 当客户端发送毗邻请求时。
        /// </summary>
        /// <param name="dataPackage">毗邻请求的包</param>
        private void OnConnect(MQTTDataPackage dataPackage)
        {
            ConnectDataPackage connectDataPackage = new ConnectDataPackage(dataPackage);
            var result = OnClientConnect(connectDataPackage);
            var client = TcpServer.GetClientById(connectDataPackage.ClientId);
            if (client.Success)
            {
                client.Data.WriteLog($"新的客户端毗邻{this.RemoteEndPoint}上线,旧毗邻关闭");
                client.Data.Close();
            }
            ClientId = connectDataPackage.ClientId;
            this.KeepAlive = Convert.ToInt32(connectDataPackage.KeepAlive * 1000 * 1.5);
            var package = new ConnectAckDataPackage() { Result = result };
            SendPackage(package);

        }
        /// <summary>
        /// 发送一个尺度的mqtt包到客户端毗邻。
        /// </summary>
        /// <param name="package"></param>
        public virtual void SendPackage(MQTTDataPackage package)
        {
            WriteLog($"发送{package.MessageType}包,QOS:{package.QoS}");
            this.SendBytes(package.ToBytes());
        }
        /// <summary>
        /// 当客户端毗邻到服务验证是否可以毗邻
        /// </summary>
        /// <param name="message"></param>
        /// <returns></returns>
        protected virtual Result OnClientConnect(ConnectDataPackage message)
        {
            WriteLog($"客户端{message.ProtocolName}毗邻,客户端编号{message.ClientId},用户名:{message.UserName},密码:{message.Password},CeanSession:{message.CeanSession}");
            return true;
        }
        /// <summary>
        /// 吸收一个完整的包。
        /// </summary>
        /// <returns></returns>
        protected override Result<MQTTDataPackage> ReceivePackage()
        {

            Result<byte[]> result;
            Result<MQTTDataPackage> resultPackage = new Result<MQTTDataPackage>() { Success = false };
            MQTTDataPackage package = new MQTTDataPackage() { ClientId = ClientId, RemoteEndPoint = RemoteEndPoint };
            result = this.Socket.ReceiveBytes(1);
            if (!result.Success)
            {
                WriteLog("获取mqtt 头 首字节失败");
                this.Close();
                return resultPackage;
            }
            package.Header = result.Data[0];
            var msgLengthResult = ReadLength();
            if (!msgLengthResult.Success)
            {
                WriteLog(msgLengthResult.Message);
                return resultPackage;
            }
            result = this.Socket.ReceiveBytes(msgLengthResult.Data);
            if (!result.Success)
            {
                WriteLog($"获取数据长度{msgLengthResult.Data}内容失败");
                return resultPackage;
            }
            package.Data = result.Data;
            resultPackage.Data = package;
            resultPackage.Success = true;
            resultPackage.Message = "获取包乐成";
            return resultPackage;
        }
        /// <summary>
        /// 获取一个长度数据
        /// </summary>
        /// <returns></returns>
        protected virtual Result<int> ReadLength()
        {
            var result = this.Socket.ReceiveBytes(1);
            if (!result.Success)
            {
                WriteLog("获取mqtt 长度失败");
                return new Result<int>(result);
            }
            var msgType = result.Data[0];
            var msgLength = msgType & 127;//取低7为的值,由于可变长度有用值只有低7位,第8位用来标识下一个字节是否属于长度字节
            var leftBit = 7;
            while (msgType >> 7 == 1)//判断最高位是否为1,若是为1则说明后面的1个字节也是属于长度字节
            {
                result = this.Socket.ReceiveBytes(1);
                if (!result.Success)
                {
                    WriteLog("获取mqtt 长度失败");
                    return new Result<int>(result);
                }
                msgType = result.Data[0];
                msgLength = ((msgType & 127) << leftBit) | msgLength;// 由于mqtt 可变长度的字节是低位在前,以是新取到的长度要左移取到的次数*7位在|原来的长度。
                leftBit += 7;
            }
            return msgLength;
        }
    }

 

,

欧博Allbet

欢迎进入欧博Allbet官网(Allbet Game):www.aLLbetgame.us,欧博官网是欧博集团的官方网站。欧博官网开放Allbet注册、Allbe代理、Allbet电脑客户端、Allbet手机版下载等业务。

版权声明

本文仅代表作者观点,
不代表本站Allbet欧博官网的立场。
本文系作者授权发表,未经许可,不得转载。

评论