前一段时间在优化行情数据系统,整个系统的大概流程是从第三方数据服务商获取行情数据,然后通过专门的网络传输到自有的服务器上进一步分析处理。由于网络带宽有限,随着行情源数据流量的不断增大,有必要对流量进行优化,所以需要对数据传输流进行压缩,以减少数据传输过程中的数据流量。

    最开始使用的C#里面内置GZip压缩算法,将二进制转换为流,然后压缩传输。现在需要去调研一下有没有更好的压缩算法,我这里记录一下我的解决思路。

为什么需要压缩


    压缩在本质上,就是将数据换一种方式来进行存储,比如一种最常见的思路是:如果数据里面重复的内容比较多,那么可以简单的将这些重复的内容只记录一次,然后用其它字段记录该内容的存储位置。一般来说,数据的重复越多,压缩的效果就越好。

    不同的领域,处理的数据特性不同。比如在股票行情领域,它属于时序数据,数据冗余度和相似度较高,天然适合进行压缩。比如下面某一时刻某只股票的行情:

▲ 某一时刻,某只股票的行情信息

    可以看到,在买卖十档数据中,有很多重复数字,比如价格里面的1834,前面两位或者三位基本都是一样的,再比如在逐比成交里面,有大量相同的成交价格。另外,T时刻与T+1时刻的数据,可能也有很多重复。假如在这一段时间内,没有交易产生,那么T和T+1时刻的数据就会是一样的。

    因此,这种时序数据,天然适合进行压缩,压缩能极大的减少数据量。

    当然,压缩和解压是需要耗费时间的,但是整个行情系统的时间它实际上是包括数据处理加上传输的时间,如果数据处理阶段能够通过消耗一定的时间减少数据量,那么在数据传输时间就能极大的减少数据传输的耗时,整体上是能够提升效率的。

选取压缩算法需要考虑的因素


     压缩算法有很多,可以按照不同的特性进行划分,比如无损压缩和有损压缩,无损压缩是指压缩后能够解压回原来的数据,数据质量不会丢失,在行情数据的处理过程中,很明显需要采用无损压缩算法。有损压缩是指压缩后,数据精度可能会有损失,不能够复原,比如最常见的是图片的处理,将图片从高分辨率处理为低分辨率、大图片调整为小图片,这种调整是不可逆的,从低分辨率调整为高分辨,数据精度和质量就会丢失。

   另外需要考虑的就是压缩比和压缩时间。这本质上是一种时间和空间的矛盾。一些压缩算法压缩解压快,但是压缩率低;一些算法压缩解压慢,但是压缩率比较高。在有些场景下,对时间比较敏感,所以就可以选取压缩解压耗时小的算法,比如实时行情处理,它对时间有极高的要求;而有些场景对时间不敏感,但对空间很敏感,比如如果要将历史行情数据进行落地存储,因为数据量大磁盘空间有限,所以需要选择压缩率比较高的算法以节约磁盘空间。

    另外,被压缩数据块的大小可能也会对压缩时间和压缩率产生影响,比如对过小的数据进行压缩,可能压缩后的体积比压缩前还要大。所以需要考虑压缩数据的大小对同一压缩算法在压缩率方面的影响。

比较和分析


    除了GZip压缩算法之外,需要寻找能供使用的其它无损压缩算法。有两个思路:

  • 一种是将关键字转换为对英文支持较好的搜索引擎比如Google去查找。比如我这里需要的是无损压缩算法,所以可以使用“lossless compression algorithm”作为关键字去搜索,在确定好算法之后,可以进一步看是否有对应的.NET版本。一般都会有,这些压缩算法为求高效率一般会使用C或者C++编写,.NET可以通过P/Invoke技术非常方便与其它类库进行互操作,所以常用的比较流行的算法都会有.NET的版本,不论是原生实现还是对其它类库的包装。
  • 第二种方法就是直接在Github上,使用以上关键字去搜索。

    经过一番查找,我找到了以下几种无损压缩算法:

  • GZip,这种已经是内置的,但它有两种实现,一种是.NET内置的System.IO.Compression.GZipStream,现有的系统使用的就是这种;还有一种是第三方的实现SharpZipLib
  • Zstd是facebook开源的提供高压缩比的快速压缩算法,读做Zstandard。它是一种快速的无损压缩算法,主要应用于 zlib 级别的实时压缩方案,并且具有更好的压缩比。它还可以以压缩速度为代价提供更强的压缩比,速度与压缩率可配置,是一项性能优秀的压缩技术,与 zlib、lz4、xz 等压缩算法不同,Zstd 寻求的是压缩性能与压缩率通吃的方案。针对Zstd的.NET包装有ZstdNet类库,它使用P/Invoke对Zstd库进行了封装,目前版本是1.4.5,它是对Zstd 1.4.5版本的封装。Zstd有更新的版本,直接用最新的Zstd版本放到ZstdNet类库下面也能直接使用,因为签名没有发生变化。
  • Lz4压缩算法是由YannCollet在2011年设计实现的,lz4属于lz77系列的压缩算法。lz77严格意义上来说不是一种算法,而是一种编码理论,它只定义了原理,并没有定义如何实现。lz4是目前基于综合来看效率最高的压缩算法,更加侧重于压缩解压缩速度,压缩比并不突出,本质上就是时间换空间。它的.NET实现有K4os.Compression.LZ4
  • Snappy 是一个 C++ 的用来压缩和解压缩的开发包。其目标不是最大限度压缩或者兼容其他压缩格式,而是旨在提供高速压缩速度和合理的压缩率。Snappy 比 zlib 更快,但文件相对要大。Snappy 在 Google 内部被广泛的使用,从 BigTable 到 MapReduce 以及内部的 RPC 系统,他的.NET实现是snappy.net

    需要注意的是,因为.NET平台有很多版本,由于当前系统使用的是.NET 4.5,所以上面的.NET版本采用的都是.NET 4.5或者兼容.NET 4.5版本。在测试之前,新建一个接口,提供压缩解压方法:

public interface ICompressor
{
    byte[] Compressor(byte[] data);
    byte[] Decompressor(byte[] data);
}

     然后就是各种不同算法的实现类,它们都实现这个接口。下面分别是.NET内置的GZip、ISharpZipLib的GZip、ZStd、Lz4、和Snappy的实现:

public class MSGZip : ICompressor
{
    public byte[] Compressor(byte[] data)
    {
        using (var ms = new MemoryStream())
        {
            using (var gzipstream = new GZipStream(ms, CompressionMode.Compress))
            {
                gzipstream.Write(data, 0, data.Length);
            }
            return ms.ToArray();
        }
    }

    public byte[] Decompressor(byte[] data)
    {
        using (var ms = new MemoryStream(data))
        {
            using (var gzs = new GZipStream(ms, CompressionMode.Decompress))
            {
                using (var outBuf = new MemoryStream())
                {
                    gzs.CopyTo(outBuf);
                    return outBuf.ToArray();
                }
            }
        }
    }
}

public class SharpZipGzip : ICompressor
{
    public byte[] Compressor(byte[] data)
    {
        using (var ms = new MemoryStream())
        {
            using (var gzipstream = new GZipOutputStream(ms))
            {
                gzipstream.Write(data, 0, data.Length);
            }
            return ms.ToArray();
        }
    }

    public byte[] Decompressor(byte[] data)
    {
        using (var ms = new MemoryStream(data))
        {
            using (var gzs = new GZipInputStream(ms))
            {
                using (var outBuf = new MemoryStream())
                {
                    gzs.CopyTo(outBuf);
                    return outBuf.ToArray();
                }
            }
        }
    }
}

public class ZStd : ICompressor
{
    private Compressor compressor = new Compressor();
    private Decompressor decompressor = new Decompressor();
 
    public byte[] Compressor(byte[] data)
    {
        return compressor.Wrap(data);
    }

    public byte[] Decompressor(byte[] data)
    {
        return decompressor.Unwrap(data);
    }
}

public class LZ4 : ICompressor
{
    public byte[] Compressor(byte[] data)
    {
        return LZ4Pickler.Pickle(data);
    }

    public byte[] Decompressor(byte[] data)
    {
        return LZ4Pickler.Unpickle(data);
    }
}

public class Snappy : ICompressor
{
    public byte[] Compressor(byte[] data)
    {
        using (var ms = new MemoryStream())
        {
            using (var gzipstream = new SnappyStream(ms, System.IO.Compression.CompressionMode.Compress))
            {
                gzipstream.Write(data, 0, data.Length);
            }
            return ms.ToArray();
        }
    }

    public byte[] Decompressor(byte[] data)
    {
        using (var ms = new MemoryStream(data))
        {
            using (var gzs = new SnappyStream(ms, System.IO.Compression.CompressionMode.Decompress))
            {
                using (var outBuf = new MemoryStream())
                {
                    gzs.CopyTo(outBuf);
                    return outBuf.ToArray();
                }
            }
        }
    }
}

      有了以上方法之后,我们需要为每个需要测试的项目来新建测试类,比如我这里需要测试,压缩率、压缩时间、压缩解压时间,所以分别新建了三个类CompressorRatioBenchMark、CompressorTimeBenchMark、CompressorDecompressorTimeBenchMark,这些类大同小异,我这里以测试压缩率的CompressorRatioBenchMark类来说明:

public class CompressorRatioBenchMark
{
    private int zipCount = 1;
    List<byte[]> testDatas = new List<byte[]>();
    private bool useByteArrayCompressor = true;
    public CompressorRatioBenchMark(DataFeedDataType dateType, int zcount = 1, int dataCount = 1, bool useByteArrayCompressor = true)
    {
        this.useByteArrayCompressor = useByteArrayCompressor;
        zipCount = zcount;
        for (int i = 0; i < zipCount; i++)
        {
            byte[] b = TestSendData.GetTestData(dateType, dataCount);
            testDatas.Add(b);
        }
    }
 
    public double MSGZip()
    {
        ICompressor zip = new MSGZip();
        return TestCompressorRatio(zip);
    }
 
    public double SharpZipGzip()
    {
        ICompressor zip = new SharpZipGzip();
        return TestCompressorRatio(zip);
    }
 
    public double ZStd()
    {
        ICompressor zip = new ZStd();
        return TestCompressorRatio(zip);
    }
 
    public double LZ4()
    {
        ICompressor zip = new LZ4();
        return TestCompressorRatio(zip);
    }
 
    public double Snappy()
    {
        ICompressor zip = new Snappy();
        return TestCompressorRatio(zip);
    }

    private double TestCompressorRatio(ICompressor compressor)
    {
        double totalOriginal = 0;
        double totalZipped = 0;
        for (int i = 0; i < testDatas.Count; i++)
        {
            byte[] original = testDatas[i];
            totalOriginal += original.Length;
            if (useByteArrayCompressor)
            {
                byte[] zipeed = compressor.Compressor(original);
                totalZipped += zipeed.Length;
            }
            else
            {
                byte[] zipeed = compressor.CompressorStream(original);
                totalZipped += zipeed.Length;
            }
        }
        return totalZipped / totalOriginal;
    }

}

      构造函数里有一些参数,第一个DataType表示数据类型,比如我这里有Level2快照数据、逐比成交数据、全速盘口数据类型,这些类型对应不同的单个测试数据格式,最主要是大小不同。第二个参数是测试次数,比如测试10次,那么就是随机生成10份数据,进行压缩,然后取平均值,第三个参数是单个数据类型的大小,比如我这里设置为2,那么就是把2两个DataType表示的数据类型,拼成一个byte[]数组,这个要用到BlockCopy:

public static byte[] Combine(byte[] first, byte[] second)
{
    byte[] ret = new byte[first.Length + second.Length];
    Buffer.BlockCopy(first, 0, ret, 0, first.Length);
    Buffer.BlockCopy(second, 0, ret, first.Length, second.Length);
    return ret;
}

    最后一个参数表示是用二进制数组还是流进行压缩,我这里只演示了对二进制数组的压缩。如果要测试流的压缩,需要在接口中添加新的方法,各个类里面也需要添加相应的实现。

    实现了上面三个类之后,需要增加一个管理类BenchMarkManual,用来统筹测试项目:

public enum CompressorCategory
{
    Ratio = 1,
    CompressionTime = 2,
    CompressionAndDecompressionTime = 3
}
public class ResultInfo
{
    public static string GetDisplayHeader()
    {
        return $"DataType,CompressionCategory,DataCount,CompressorTimes,GZip,SharpZiGzip,ZSTD,LZ4,Snappy";
    }
    public DataFeedDataType DataType;
    public int DataCount;
    public int CompressorTimes;
    public CompressorCategory CompressionCategory;
    public double GZip;
    public double SharpZiGzip;
    public double ZSTD;
    public double LZ4;
    public double Snappy;
    public override string ToString()
    {
        return $"{DataType},{CompressionCategory},{DataCount},{CompressorTimes},{GZip},{SharpZiGzip},{ZSTD},{LZ4},{Snappy}";
    }
}
public class BenchMarkManual
{
    private bool useByteArrayCompression;
    private DataFeedDataType dtype;
    private int dataCount;
    private bool warmUp;
    private int comptimes;
    public BenchMarkManual(DataFeed.DataFeedDataType dataType, int times, int dc, bool useBinary)
    {
        useByteArrayCompression = useBinary;
        dtype = dataType;
        comptimes = times;
        dataCount = dc;
        warmUp = true;
        CompressionRatio();
        CompressionTime();
        CompressionAndDecompressionTime();
        warmUp = false;
    }
    public ResultInfo CompressionRatio()
    {
        ResultInfo result = new ResultInfo();
        result.CompressionCategory = CompressorCategory.Ratio;
        result.DataType = dtype;
        result.DataCount = dataCount;
        result.CompressorTimes = comptimes;
        CompressorRatioBenchMark rate = new CompressorRatioBenchMark(dtype, comptimes, dataCount, useByteArrayCompression);
        double msgzip = rate.MSGZip();
        double sharpZipGzip = rate.SharpZipGzip();
        double lz4 = rate.LZ4();
        double zstd = rate.ZStd();
        double snoopy = rate.Snappy();
        if (!warmUp)
        {
            string compressorMethod;
            if (useByteArrayCompression)
            {
                compressorMethod = "bytearray";
            }
            else
            {
                compressorMethod = "memorystream";
            }
            Console.WriteLine($"{dtype}(size*{dataCount}) :avg compress rate ({comptimes} times)   use {compressorMethod} compressor method:");
            Console.WriteLine("gzip:" + msgzip);
            Console.WriteLine("sharpzip_gzip:" + sharpZipGzip);
            Console.WriteLine("lz4:" + lz4);
            Console.WriteLine("zstd:" + zstd);
            Console.WriteLine("snappy:" + snoopy);
            Console.WriteLine("***********************************");
            result.GZip = msgzip;
            result.SharpZiGzip = sharpZipGzip;
            result.LZ4 = lz4;
            result.ZSTD = zstd;
            result.Snappy = snoopy;
        }
        return result;
    }

    public ResultInfo CompressionTime()
    {
        ResultInfo result = new ResultInfo();
        result.CompressionCategory = CompressorCategory.CompressionTime;
        result.DataType = dtype;
        result.DataCount = dataCount;
        result.CompressorTimes = comptimes;
        CompressorTimeBenchMark rate = new CompressorTimeBenchMark(dtype, comptimes, dataCount, useByteArrayCompression);
        double msgzip = rate.MSGzip();
        double sharpZipGzip = rate.SharpZipGzip();
        double lz4 = rate.LZ4();
        double zstd = rate.ZSTD();
        double snoopy = rate.Snappy();
        if (!warmUp)
        {
            string compressorMethod;
            if (useByteArrayCompression)
            {
                compressorMethod = "bytearray";
            }
            else
            {
                compressorMethod = "memorystream";
            }
            Console.WriteLine($"{dtype}(size*{dataCount}):avg compress time ({comptimes} times) ticks use {compressorMethod} compressor method:");
            Console.WriteLine("gzip:" + msgzip);
            Console.WriteLine("sharpzip_gzip:" + sharpZipGzip);
            Console.WriteLine("lz4:" + lz4);
            Console.WriteLine("zstd:" + zstd);
            Console.WriteLine("snappy:" + snoopy);
            Console.WriteLine("***********************************");
            result.GZip = msgzip;
            result.SharpZiGzip = sharpZipGzip;
            result.LZ4 = lz4;
            result.ZSTD = zstd;
            result.Snappy = snoopy;
        }
        return result;
    }

    public ResultInfo CompressionAndDecompressionTime()
    {
        ResultInfo result = new ResultInfo();
        result.CompressionCategory = CompressorCategory.CompressionAndDecompressionTime;
        result.DataType = dtype;
        result.DataCount = dataCount;
        result.CompressorTimes = comptimes;
        CompressorUnZipTimeBenchMark rate = new CompressorUnZipTimeBenchMark(dtype, comptimes, dataCount, useByteArrayCompression);
        double msgzip = rate.MSGzip();
        double sharpZipGzip = rate.SharpZipGzip();
        double lz4 = rate.LZ4();
        double zstd = rate.ZSTD();
        double snoopy = rate.Snappy();
        if (!warmUp)
        {
            string compressorMethod;
            if (useByteArrayCompression)
            {
                compressorMethod = "bytearray";
            }
            else
            {
                compressorMethod = "memorystream";
            }
            Console.WriteLine($"{dtype}(size*{dataCount}) :avg compress-decompress time ({comptimes} times) milliseconds use {compressorMethod} compressor method:");
            Console.WriteLine("gzip:" + msgzip);
            Console.WriteLine("sharpzip_gzip:" + sharpZipGzip);
            Console.WriteLine("lz4:" + lz4);
            Console.WriteLine("zstd:" + zstd);
            Console.WriteLine("snappy:" + snoopy);
            Console.WriteLine("***********************************");
            result.GZip = msgzip;
            result.SharpZiGzip = sharpZipGzip;
            result.LZ4 = lz4;
            result.ZSTD = zstd;
            result.Snappy = snoopy;
        }
        return result;
    }
}

     构造函数里,需要提供待测试的数据类型、测试次数、测试数据的大小倍数,以及是否采用二进制数组压缩解压。其中针对每一项提供了一个函数。

最后,在Mian函数里,直接调用:

class Program
{
    static string fileName = "Result.txt";
    static void Main(string[] args)
    {
        List<string> lines = new List<string>();
        lines.Add(ResultInfo.GetDisplayHeader());
        BenchMarkManual manual;
        for (int i = 1; i <= 100; i++)
        {
            manual = new BenchMarkManual(DataFeed.DataFeedDataType.FastLevel2, 1000, i, true);
            ResultInfo result = manual.CompressionRatio();
            lines.Add(result.ToString());
            result = manual.CompressionTime();
            lines.Add(result.ToString());
            result = manual.CompressionAndDecompressionTime();
            lines.Add(result.ToString());
        }
        if (System.IO.File.Exists(fileName))
        {
            File.Delete(fileName);
        }
        System.IO.File.WriteAllLines(fileName, lines);
        Console.ReadLine();
    }
}

    我这里,对FastLevel2全速盘口这一数据类型,大小倍数从1测试到100,每次测试1000次取平均,然后将运行结果输出到txt文件,然后在Excel里面对比分析。

结果分析


     以上程序运行结束后,会将结果输出到txt文件,使用Excel打开txt就可以分析。

    ▲ 程序运行界面

     在对结果进行分析,并制图后,可以看到以下结果:

▲不同FastLevel2倍数大小下,不同算法的压缩率(压缩后大小/压缩前大小,越小越好)

    可以看到在压缩率方面,GZip和SharpZipGzip以及Zstd是在一组,LZ4和Snappy在一组,总体而言,前者比后者压缩率要好很多。C#内置的GZip实现、与SharpZip的实现,两者在压缩率方面比较接近,基本差不多。当Size变大时,GZip和SharpZip整体压缩率率比Zstd稍好,大概在9~10倍的FastLevel2大小时,两者比较接近。

▲不同FastLevel2倍数大小下,不同算法的压缩耗时(单位tick,越小越好)

     在压缩耗时方面,可以看到前面压缩率比较差的Snappy则表现的非常好,压缩很快,且随着压缩数据的大小增大,压缩时间增长也比较缓慢。Zstd表现也很好,曲线比较平缓。C#内置和SharpZip实现的GZip算法,在压缩时间方面耗时则比较长,并且随着数据Size的增大,压缩耗时增加也比较明显。这里可以看到C#内置的GZip实现,比SharpZipLib的实现,在压缩时间方面,要优秀很多。

▲不同FastLevel2倍数大小下,不同算法的压缩解压耗时(单位tick,越小越好)

     在压缩和解压总耗时方面,整体跟单独的压缩耗时差不多。依然是Snappy最优,Zstd也相当优秀,两个GZip的实现比较耗时,且SharpZipLib比内置的更耗时。

     总体而言,在对全速盘口行情的压缩方面,Zstd算法在压缩率方面与GZip接近,在9-10倍的FastLevel2大小时,两者压缩效率基本相等。但Zstd在兼顾压缩率的同时,压缩时间也比较短,且当数据大小增大时,耗时增加也比较平缓。总体而言,在这四个算法中,Zstd效果最好。

    需要提醒的是,对于ZstdNET这个类库,它提供了两种方法,一种是直接对二进制数组进行压缩解压,

byte[] Wrap(byte[] src);
byte[] Unwrap(byte[] src, int maxDecompressedSize = int.MaxValue);

    一种是对数据流进行压缩解压:

CompressionStream(Stream stream);
DecompressionStream(Stream stream);

     前者是后者效率的10倍还多(差别可能是二进制算法是在Wrap类里对使用P/Invoke对Native dll进行了一次直接调用,数据流方法则在Wrap类内部进行了多次的P/Invoke调用),所以如果要使用,尽量使用二进制数组的压缩解压方法。

总结


    需要说明的是,本文只是针对特定的平台:NET Framework 4.5、特定的测试数据:股票的极速Level2数据来进行不同压缩算法的相对效率比较。并且没有使用BenchmarkDotNet这种专业的性能分析工具来对比分析,其原因是限于.NET Framework 4.5平台的限制,BenchmarkDotNet最新的版本并不支持.NET Framework 4.5,如果使用4.5版本,则缺少一些特性,比如我需要的除了运行时间之外,还需要压缩率这种指标,在BenchmarkDotNet支持4.5的版本里还没找到如何实现。一个标准的测试,需要考虑线程优先级,垃圾回收对结果的影响,这些因素我没有考虑,但是这样的影响对所有的算法则都一样,且我是测试了1000次求平均得出的结果,所以应该是具有参考意义的。另外在Github上,EasyCompressor这个项目使用BenmarkDotNet比较了.NET Core平台下各种压缩效率,它得出的结论也是Zstd最好,有兴趣的可以看看。

    针对行情数据这种时序数据,本文分析了几种无损压缩算法,分别是内置的GZip、SharpZipLib的GZip实现、Zstd、LZ4和Snappy。在对比后发现,Zstd具有比较好的压缩率,同时也能有比较好的压缩速度,在这几种算法中最能满足情数据处理以及传输效率的要求。

 

参考

  1. https://github.com/facebook/zstd
  2. https://facebook.github.io/zstd/
  3. https://github.com/skbkontur/ZstdNet 
  4. https://www.cnblogs.com/t-bar/p/16506289.html
  5. https://www.cnblogs.com/buttercup/p/16414046.html 
  6. https://www.codenong.com/415291/ 
  7. https://zhuanlan.zhihu.com/p/414871190 
  8. https://github.com/mjebrahimi/EasyCompressor 
  9. https://github.com/lz4/lz4 
  10. https://github.com/MiloszKrajewski/K4os.Compression.LZ4 
  11. https://github.com/dotnet/BenchmarkDotNet 
  12. https://mp.weixin.qq.com/s/kbSdt8tRT4EtmxyYuj_dHA