最近工作中遇到一个需求,需要实时分析线上程序通过log4net记录下来的日志。在此之前都是手动从服务器上把日志文件拷贝出来,或者把分析程序拷贝到服务器上,然后进行日志分析。那么有没有办法实时读取并进行分析呢?这样在“事中”就能发现异常并介入处理,而不是“事后”。这就是本文要介绍的内容:通过编写程序来监控日志文件的变化,一旦有变化,就进行读取然后进行分析。这里边有两个关键点:

  • 如何控制多个进程同时对同一文件进行读写,并处理冲突。
  • 如何监控文件的变化,并对变化内容进行读取。

这里记录一下我对这两个关键点的思考和处理。

问题


线上有个程序,使用了log4net来记录日志,log4net的配置如下:

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
	<configSections>
		<section name="log4net" type="System.Configuration.IgnoreSectionHandler" />
	</configSections>
	<appSettings>
		<add key="somekey" value="somevalue"/>
	</appSettings>
	<log4net>
		<root>
			<!--文件形式记录日志-->
			<appender-ref ref="LogFileAppender" />
		</root>
		<!--定义输出到文件中-->
		<appender name="LogFileAppender" type="log4net.Appender.RollingFileAppender">
			<param name="file" value="./log/logfile_" />
			<param name="appendToFile" value="true" />
			<param name="rollingStyle" value="Date" />
			<param name="StaticLogFileName" value="false" />
			<datePattern value="yyyyMMdd'.log'" />
			<param name="Threshold" value="DEBUG" />
			<layout type="log4net.Layout.PatternLayout">
				<param name="ConversionPattern" value="[%d{yyyy-MM-dd HH:mm:ss,fff}] %5p - %m%n" />
			</layout>
		</appender>
	</log4net>
	<startup> 
        <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5" />
    </startup>
</configuration>

程序中的一些debug信息,会以名为"logfile_{yyyyMMdd}.log"的文本文件记录下来。在程序运行过程中,每当调用ILog.Debug时,就会往该文件写入内容。

现在需要实现一个功能,实时监控“logfile_{yyyyMMdd}.log”文件的变化,并将其内容读取出来,以便进一步的处理和分析。

文件变化监控的实现


要监控文件的变化,可以使用FileSystemWatcher类,当设置好需要监控的文件夹以及筛选条件后,当符和筛选条件的文件发生变化时,就会触发相应的事件。

FileSystemWatcher watcher = new FileSystemWatcher(watchFolder, watchedFileName);
watcher.EnableRaisingEvents = true;
watcher.NotifyFilter = NotifyFilters.LastWrite | NotifyFilters.Size;
watcher.Changed += watcher_Changed;

上述指定了文件夹和文件名,FileSystemWatcher就会监控这个文件。文件名也可以使用通配符,比如要监控所有的txt文件,这个文件名可以写为“*.txt”。还可以设置需要提醒的变化类型,比如这里当文件的最后写入时间或者大小发生变化时就提醒。提醒是通过事件来触发的,这里的订阅事件如下:

private void watcher_Changed(object sender, FileSystemEventArgs e)
{
    if (e.ChangeType == WatcherChangeTypes.Created || e.ChangeType == WatcherChangeTypes.Changed)
    {
        changedFileName = e.FullPath;
        fileChanged = true;
    }
}

在事件处理中,这里只记录了变更的文件名称以及是否有变更标识。

没有在事件变更里面直接读取文件是因为这个文件的写入可能非常频繁,由于对实时性要求没有那么高,所以这里开启了一个定时器来定时的检查并读取变更文件的内容,这里设置为了3秒一次,每3秒检查一下文件是否有变更即fileChange是否为true,如果有变更则先将fileChange置为false,然后进行后续的读取操作。

读取文件的变更内容


首先定义一个计时器,计时器定义后马上运行一次,以读取文件里面的全部内容。

Timer timer = new Timer(readFileTimer_Elapsed, null, 0, readInterval);

在回调事件中,判断文件是否有变化然后读取。这里需要注意的是首先将定时器暂停,然后当执行完成之后,再开启定时器。

private void readFileTimer_Elapsed(object sender)
{
    timer.Change(Timeout.Infinite, readInterval);
    if (fileChanged && !string.IsNullOrEmpty(changedFileName) && File.Exists(changedFileName))
    {
        fileChanged = false;
        List<string> oneBatchData = new List<string>();
        using (var fileStream = new FileStream(changedFileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
        using (var sr = new StreamReader(fileStream, Encoding.Default))
        {
            sr.BaseStream.Seek(lastReadPosition, SeekOrigin.Begin);
            while (!sr.EndOfStream)
            {
                string msg = sr.ReadLine();
                if (!string.IsNullOrEmpty(msg))
                {
                    oneBatchData.Add(msg);
                }
            }
            lastReadPosition = sr.BaseStream.Position;
        }

        if (oneBatchData.Count > 0)
        {
            allLines.AddRange(oneBatchData);
            FireNewLines(oneBatchData);
        }
    }
    timer.Change(readInterval, readInterval);
}

这里面有两个关键点,一个是如何处理多个进程对一个文件写入,另外一个进程对文件进行读取的问题。另外一个就是如何存储上次读取到的文件的位置。

处理多个进程读取同一文件的冲突


我们知道,在同一个进程中,多个线程读写同一个文件的时候,需要加锁。但如果是多个进程读取同一文件,除非这多个进程都是自己编写的,那么可以使用全局锁,比如Mutex(在log4net中,如果一个程序可以开多个实例,则必须把锁设置为InterProcessLock,其内部就是使用了Mutex,否则多个实例运行时可能会产生成多个日志文件)。如果其它的程序不是自己写的,无法修改,那就不能用锁了,这时FileStream就有用了,它提供了读写文件时的共享参数,即FileShare,以用来处理读写文件时的共享问题,它有以下枚举值:

//包含用于控制其他 System.IO.FileStream 对象对同一文件可以具有的访问类型的常数。
[ComVisible(true)]
[Flags]
public enum FileShare
{
    // 谢绝共享当前文件。 文件关闭前,打开该文件的任何请求(由此进程或另一进程发出的请求)都将失败。
    None = 0,
    // 允许随后打开文件读取。
    // 如果未指定此标志,则文件关闭前,任何打开该文件以进行读取的请求(由此进程或另一进程发出的请求)都将失败。
    // 但是,即使指定了此标志,仍可能需要附加权限才能够访问该文件。
    Read = 1,
    // 允许随后打开文件写入。
    // 如果未指定此标志,则文件关闭前,任何打开该文件以进行写入的请求(由此进程或另一进过程发出的请求)都将失败。
    // 但是,即使指定了此标志,仍可能需要附加权限才能够访问该文件。
    Write = 2,
    // 允许随后打开文件读取或写入。
    // 如果未指定此标志,则文件关闭前,任何打开该文件以进行读取或写入的请求(由此进程或另一进程发出)都将失败。
    // 但是,即使指定了此标志,仍可能需要附加权限才能够访问该文件。
    ReadWrite = 3,
    //允许随后删除文件。
    Delete = 4,
    //使文件句柄可由子进程继承。 Win32 不直接支持此功能。
    Inheritable = 16
}

这个参数的含义是指定了本次打开文件后,其它线程或进程允许对该文件的相关操作

在本例中,我使用了这个参数:

using (var fileStream = new FileStream(changedFileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))

需要注意的是,这里的changedFileName是日志文件,另外一个线上的程序可能随时都在对这个文件进行读写。我们的监控程序,在打开这个文件的时候,指定了FileShare.ReadWrite标识,即表示允许其它线程或进程在我们打开文件的时候,同时可以进行读取或者写入。

在一开始的时候,我并没有手动创建一个FileStream并指定FileShare参数,而是直接传入文件名,定义了一个SteamReader。

using (var sr = new StreamReader(changedFileName, Encoding.Default))

然后发现,当程序运行的时候,线上运行的程序在调用log4net往日志文件里写入内容的时候,有日志丢失的情况,很显然,这时发生了读写冲突。为了研究直接定义StreamReader并传入文件名的做法在内部的实现,我查看了StreamReader源码,可以发现,它在内部创建了一个FileStream对象,FileShare的参数指定为了Read。

[System.Security.SecurityCritical]
[ResourceExposure(ResourceScope.Machine)]
[ResourceConsumption(ResourceScope.Machine)]
internal StreamReader(String path, Encoding encoding, bool detectEncodingFromByteOrderMarks, int bufferSize, bool checkHost)
{
    // Don't open a Stream before checking for invalid arguments,
    // or we'll create a FileStream on disk and we won't close it until
    // the finalizer runs, causing problems for applications.
    if (path == null || encoding == null)
        throw new ArgumentNullException((path == null ? "path" : "encoding"));
    if (path.Length == 0)
        throw new ArgumentException(Environment.GetResourceString("Argument_EmptyPath"));
    if (bufferSize <= 0)
        throw new ArgumentOutOfRangeException("bufferSize", Environment.GetResourceString("ArgumentOutOfRange_NeedPosNum"));
    Contract.EndContractBlock();

    Stream stream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, DefaultFileStreamBufferSize, FileOptions.SequentialScan, Path.GetFileName(path), false, false, checkHost);
    Init(stream, encoding, detectEncodingFromByteOrderMarks, bufferSize, false);
}

也就是说,FileShare.Read指定了,当我们打开一个文件进行读操作的时候,其它的线程或者进程对该文件只能进行读操作。而不能进行读之外的其它操作比如写,或者删除,如果进行其它线程或者进程对该文件进行写或者删除,那么就会失败。直到当前的读操作完成,关闭文件后,其它进程或线程才可以操作。这也验证了,当直接使用ReadStream来读日志文件时,线上的程序无法写入日志文件从而导致日志信息丢失的问题。

解决办法就是手动创建FileStream,并手动指定FileShare参数,并将FileStream作为参数传入StreamReader中,因为我们这里只是读这个日志文件,不要求绝对的实时,所以可以指定其它进程或线程可以读或者可以读写。

另外还有一个问题,那就是log4net在以RollingFileAppender的方式写入文件的时候,指定的是FileShare是什么模式呢?如果他指定的不是FileShare.Read模式,而是其它模式,那么我们的监控程序也不能读文件,只有等日志文件写入完成之后,保存完之后,才能写入了。带着这一个疑问,我查找了log4net的源码:

在log4net.Appender.FileAppender.ExclusiveLock有个OpenFile方法。

public override void OpenFile(string filename, bool append, Encoding encoding)
{
	try
	{
		m_stream = CreateStream(filename, append, FileShare.Read);
	}
	catch (Exception ex)
	{
		base.CurrentAppender.ErrorHandler.Error("Unable to acquire lock on file " + filename + ". " + ex.Message);
	}
}

CreateStream在log4net.Appender.FileAppender.LockingModelBase下面:

protected Stream CreateStream(string filename, bool append, FileShare fileShare)
{
	using (CurrentAppender.SecurityContext.Impersonate(this))
	{
		string directoryName = Path.GetDirectoryName(filename);
		if (!Directory.Exists(directoryName))
		{
			Directory.CreateDirectory(directoryName);
		}
		FileMode mode = (append ? FileMode.Append : FileMode.Create);
		return new FileStream(filename, mode, FileAccess.Write, fileShare);
	}
}

可以看到,在CreateStream时,指定的FileShare参数为Read,即表示,当log4net在写入文件时,允许其它线程或进程读取文件。所以我们的监控程序读取日志文件是没有问题,如果想写入文件就会报错。

在log4net内部,多个线程同时写入日志文件的时候,它在内部是使用锁来保证同一时间只有一个线程能写文件的。

读取文件的变化内容


解决了文件读写冲突问题之后,接下来就要解决读取文件变化的内容。比如初始的时候去读文件的全部内容,然后当日志文件发生变更的时候,从上一次读取的地方接着读,一直读到末尾。所以在一开始的做法是,记录下上次读取的行数,然后有变更的时候,从上次读取的行数之后开始读取。

using (var sr = new StreamReader(fileStream, Encoding.Default))
{
    //skip readed file
    if (lastReadLine > 0)
    {
        for (int i = 0; i < lastReadLine; ++i)
            sr.ReadLine();
    }
    while (!sr.EndOfStream)
    {
        string msg = sr.ReadLine();
        if (!string.IsNullOrEmpty(msg))
        {
            lastReadLine += 1;
            onceDatas.Add(msg);
        }
    }
}

这里每次读取一行后会记录一下行号,下次读取的时候,也从开头一行一行读取,然后掠过开头的n行。

以上方法很笨重,后来发现可以直接记录上次读取的Stream的位置,然后在下次读取的时候,直接使用Seek方法定位到上次读取位置之后继续读取就可以了,这样更方便快捷,不需要再读取前n行了。所以改进后的方法如下:

using (var sr = new StreamReader(fileStream, Encoding.Default))
{
    sr.BaseStream.Seek(lastReadPosition, SeekOrigin.Begin);
    while (!sr.EndOfStream)
    {
        string msg = sr.ReadLine();
        if (!string.IsNullOrEmpty(msg))
        {
            oneBatchData.Add(msg);
        }
    }
    lastReadPosition = sr.BaseStream.Position;
}

非常的快捷高效😃

至此,整个监控日志变化的核心两个问题都解决了。

完整实现


为了整合上述所有功能,我这里创建了一个FileWatcher类,它的构造函数为文件目录,文件名,和一个文件标志符WatchFileType。这个watchFileType只是用来区分是哪个文件,当同时监控多个文件,实例化多个FileWatcher类的时候,可以通过watchFileType来区分是哪个文件。

当初始化之后,调用Start方法,首先会创建一个FileSystemWatcher对象,然后创建一个3秒一次间隔的Timer对象,这个Timer对象在调用Start方式时会马上执行一次回调。这样可以立即把日志文件的内容读取出来。

当有文件内容发生变化时,会把距离上一时刻到现在文件中的记录通过事件触发发送出来。FileWatcher也保存了该监控文件的所有内容。

public class FileWatcher
{
    public EventHandler<FileWatcherNewLinesEventArgs> FileWatcherNewLines;
    public IReadOnlyCollection<string> AllLines => allLines.AsReadOnly();
    public WatchFileType WatchFileType => watchFileType;
    private WatchFileType watchFileType;
    private Timer timer;
    private bool fileChanged = true;
    private List<string> allLines = new List<string>();
    private readonly int readInterval = 1 * 1000;
    private string watchedFileName = string.Empty;
    private string watchFolder = string.Empty;
    private string changedFileName = string.Empty;
    private long lastReadPosition = 0;
    private FileSystemWatcher watcher;

    public FileWatcher(string folder, string file, WatchFileType type)
    {
        watchFolder = folder;
        watchedFileName = file;
        watchFileType = type;
        changedFileName = Path.Combine(folder, watchedFileName);
    }

    public bool Start()
    {
        if (!Directory.Exists(watchFolder))
        {
            return false;
        }
        watcher = new FileSystemWatcher(watchFolder, watchedFileName);
        watcher.EnableRaisingEvents = true;
        watcher.NotifyFilter = NotifyFilters.LastWrite | NotifyFilters.Size;
        watcher.Changed += watcher_Changed;
        timer = new Timer(readFileTimer_Elapsed, null, 0, readInterval);
        return true;
    }

    public void Stop()
    {
        if (watcher != null)
        {
            watcher.Changed -= watcher_Changed;
            watcher = null;
        }

        if (timer != null)
        {
            timer.Change(Timeout.Infinite, Timeout.Infinite);
        }
    }

    private void readFileTimer_Elapsed(object sender)
    {
        timer.Change(Timeout.Infinite, readInterval);
        if (fileChanged && !string.IsNullOrEmpty(changedFileName) && File.Exists(changedFileName))
        {
            fileChanged = false;
            List<string> oneBatchData = new List<string>();
            using (var fileStream = new FileStream(changedFileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
            using (var sr = new StreamReader(fileStream, Encoding.Default))
            {
                sr.BaseStream.Seek(lastReadPosition, SeekOrigin.Begin);
                while (!sr.EndOfStream)
                {
                    string msg = sr.ReadLine();
                    if (!string.IsNullOrEmpty(msg))
                    {
                        oneBatchData.Add(msg);
                    }
                }
                lastReadPosition = sr.BaseStream.Position;
            }

            if (oneBatchData.Count > 0)
            {
                allLines.AddRange(oneBatchData);
                FireNewLines(oneBatchData);
            }
        }
        timer.Change(readInterval, readInterval);
    }
    private void watcher_Changed(object sender, FileSystemEventArgs e)
    {
        if (e.ChangeType == WatcherChangeTypes.Created || e.ChangeType == WatcherChangeTypes.Changed)
        {
            changedFileName = e.FullPath;
            fileChanged = true;
        }
    }

    public void FireNewLines(List<string> lines)
    {
        if (FileWatcherNewLines != null)
        {
            FileWatcherNewLinesEventArgs args = new FileWatcherNewLinesEventArgs(lines, watchFileType);
            FileWatcherNewLines.Invoke(this, args);
        }
    }
}

一般地,可能有同时监控多个日志文件的需求,比如需要同事监控Normal日志和Debug日志,它是分别打印在不同的文件之中的,这时就可以定义一个FileWatcherManger类,然后在里面维护一个List<FileWatcher>列表。

在我的这个项目中,我监控了2个日志文件,并将这些日志文件的内容通过TCP的方式,发送给了日志分析的客户端。当TCP客户端初次连接时,立即发送日志文件里的所有内容,日志文件的所有最新内容在FileWatcher里面有记录。随后当日志发生变化时,只需要发送给客户端增量内容即可。

总结


本文介绍了一种监控日志文件变化,并读取变动内容的方法。通过使用FileSystemWatcher类,可以监控文件变化,并通过回调事件触发通知。为了避免文件频繁变动导致频繁的文件读写,这里使用了一个定时器来定时读取。当文件距离上一次读取发生变化时就读取。在读取文件的时候,由于涉及到多进程读写同一个文件,所以这里创建了FileStream并通过指定FileShare参数,而不是直接创建StreamReader来解决了读写冲突导致日志文件丢失的问题。在解决这一问题的过程中,还分析了long4net源码中RollingFileAppender写入文件时的FileShare参数以及StreamReader源码中,如果直接指定path时FileStream的参数。

 

参考:

https://learn.microsoft.com/en-us/dotnet/api/system.io.filesystemwatcher?view=net-6.0 

https://learn.microsoft.com/en-us/dotnet/api/system.io.fileshare?view=net-8.0 

https://www.cnblogs.com/wigis/p/5023229.html