2022-02-25 influxdb存储引擎tsm写数据时候对wal和cache的处理

2023-09-27 14:25:42 时间














// WritePoints writes metadata and point data into the engine.
// It returns an error if new points are added to an existing key.
func (e *Engine) WritePoints(points []models.Point) error {
	values := make(map[string][]Value, len(points))
	var (
		keyBuf    []byte
		baseLen   int
		seriesErr error

	for _, p := range points {
		keyBuf = append(keyBuf[:0], p.Key()...)
		keyBuf = append(keyBuf, keyFieldSeparator...)
		baseLen = len(keyBuf)
		iter := p.FieldIterator()
		t := p.Time().UnixNano()
		for iter.Next() {
			// Skip fields name "time", they are illegal
			if bytes.Equal(iter.FieldKey(), timeBytes) {

			keyBuf = append(keyBuf[:baseLen], iter.FieldKey()...)

			if e.seriesTypeMap != nil {
				// Fast-path check to see if the field for the series already exists.
				if v, ok := e.seriesTypeMap.Get(keyBuf); !ok {
					if typ, err := e.Type(keyBuf); err != nil {
						// Field type is unknown, we can try to add it.
					} else if typ != iter.Type() {
						// Existing type is different from what was passed in, we need to drop
						// this write and refresh the series type map.
						seriesErr = tsdb.ErrFieldTypeConflict
						e.seriesTypeMap.Insert(keyBuf, int(typ))

					// Doesn't exist, so try to insert
					vv, ok := e.seriesTypeMap.Insert(keyBuf, int(iter.Type()))

					// We didn't insert and the type that exists isn't what we tried to insert, so
					// we have a conflict and must drop this field/series.
					if !ok || vv != int(iter.Type()) {
						seriesErr = tsdb.ErrFieldTypeConflict
				} else if v != int(iter.Type()) {
					// The series already exists, but with a different type.  This is also a type conflict
					// and we need to drop this field/series.
					seriesErr = tsdb.ErrFieldTypeConflict

			var v Value
			switch iter.Type() {
			case models.Float:
				fv, err := iter.FloatValue()
				if err != nil {
					return err
				v = NewFloatValue(t, fv)
			case models.Integer:
				iv, err := iter.IntegerValue()
				if err != nil {
					return err
				v = NewIntegerValue(t, iv)
			case models.Unsigned:
				iv, err := iter.UnsignedValue()
				if err != nil {
					return err
				v = NewUnsignedValue(t, iv)
			case models.String:
				v = NewStringValue(t, iter.StringValue())
			case models.Boolean:
				bv, err := iter.BooleanValue()
				if err != nil {
					return err
				v = NewBooleanValue(t, bv)
				return fmt.Errorf("unknown field type for %s: %s", string(iter.FieldKey()), p.String())
			values[string(keyBuf)] = append(values[string(keyBuf)], v)

	defer e.mu.RUnlock()

	// first try to write to the cache
	if err := e.Cache.WriteMulti(values); err != nil {
		return err

	if e.WALEnabled {
		if _, err := e.WAL.WriteMulti(values); err != nil {
			return err
	return seriesErr


// WriteMulti writes the map of keys and associated values to the cache. This
// function is goroutine-safe. It returns an error if the cache will exceeded
// its max size by adding the new values.  The write attempts to write as many
// values as possible.  If one key fails, the others can still succeed and an
// error will be returned.
func (c *Cache) WriteMulti(values map[string][]Value) error {
	var addedSize uint64
	for _, v := range values {
		addedSize += uint64(Values(v).Size())

	// Enough room in the cache?
	limit := c.maxSize // maxSize is safe for reading without a lock.
	n := c.Size() + addedSize
	if limit > 0 && n > limit {
		atomic.AddInt64(&c.stats.WriteErr, 1)
		return ErrCacheMemorySizeLimitExceeded(n, limit)

	var werr error
	store := c.store

	// We'll optimistially set size here, and then decrement it for write errors.
	for k, v := range values {
		newKey, err := store.write([]byte(k), v)
		if err != nil {
			// The write failed, hold onto the error and adjust the size delta.
			werr = err
			addedSize -= uint64(Values(v).Size())
		if newKey {
			addedSize += uint64(len(k))

	// Some points in the batch were dropped.  An error is returned so
	// error stat is incremented as well.
	if werr != nil {
		atomic.AddInt64(&c.stats.WriteDropped, 1)
		atomic.AddInt64(&c.stats.WriteErr, 1)

	// Update the memory size stat
	atomic.AddInt64(&c.stats.WriteOK, 1)

	c.lastWriteTime = time.Now()

	return werr


// WriteMulti writes the given values to the WAL. It returns the WAL segment ID to
// which the points were written. If an error is returned the segment ID should
// be ignored.
func (l *WAL) WriteMulti(values map[string][]Value) (int, error) {
	entry := &WriteWALEntry{
		Values: values,

	id, err := l.writeToLog(entry)
	if err != nil {
		atomic.AddInt64(&l.stats.WriteErr, 1)
		return -1, err
	atomic.AddInt64(&l.stats.WriteOK, 1)

	return id, nil

func (l *WAL) writeToLog(entry WALEntry) (int, error) {
	// limit how many concurrent encodings can be in flight.  Since we can only
	// write one at a time to disk, a slow disk can cause the allocations below
	// to increase quickly.  If we're backed up, wait until others have completed.
	bytes := bytesPool.Get(entry.MarshalSize())

	b, err := entry.Encode(bytes)
	if err != nil {
		return -1, err

	encBuf := bytesPool.Get(snappy.MaxEncodedLen(len(b)))

	compressed := snappy.Encode(encBuf, b)

	syncErr := make(chan error)

	segID, err := func() (int, error) {
		defer l.mu.Unlock()

		// Make sure the log has not been closed
		select {
		case <-l.closing:
			return -1, ErrWALClosed

		// roll the segment file if needed
		if err := l.rollSegment(); err != nil {
			return -1, fmt.Errorf("error rolling WAL segment: %v", err)

		// write and sync
		if err := l.currentSegmentWriter.Write(entry.Type(), compressed); err != nil {
			return -1, fmt.Errorf("error writing WAL entry: %v", err)

		select {
		case l.syncWaiters <- syncErr:
			return -1, fmt.Errorf("error syncing wal")

		// Update stats for current segment size
		atomic.StoreInt64(&l.stats.CurrentBytes, int64(l.currentSegmentWriter.size))

		l.lastWriteTime = time.Now().UTC()

		return l.currentSegmentID, nil



	if err != nil {
		return segID, err

	// schedule an fsync and wait for it to complete
	return segID, <-syncErr

// Write writes entryType and the buffer containing compressed entry data.
func (w *WALSegmentWriter) Write(entryType WalEntryType, compressed []byte) error {
	var buf [5]byte
	buf[0] = byte(entryType)
	binary.BigEndian.PutUint32(buf[1:5], uint32(len(compressed)))

	if _, err := w.bw.Write(buf[:]); err != nil {
		return err

	if _, err := w.bw.Write(compressed); err != nil {
		return err

	w.size += len(buf) + len(compressed)

	return nil


// Encode converts the WriteWALEntry into a byte stream using dst if it
// is large enough.  If dst is too small, the slice will be grown to fit the
// encoded entry.
func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) {
	// The entries values are encode as follows:
	// For each key and slice of values, first a 1 byte type for the []Values
	// slice is written.  Following the type, the length and key bytes are written.
	// Following the key, a 4 byte count followed by each value as a 8 byte time
	// and N byte value.  The value is dependent on the type being encoded.  float64,
	// int64, use 8 bytes, boolean uses 1 byte, and string is similar to the key encoding,
	// except that string values have a 4-byte length, and keys only use 2 bytes.
	// This structure is then repeated for each key an value slices.
	// ┌────────────────────────────────────────────────────────────────────┐
	// │                           WriteWALEntry                            │
	// ├──────┬─────────┬────────┬───────┬─────────┬─────────┬───┬──────┬───┤
	// │ Type │ Key Len │   Key  │ Count │  Time   │  Value  │...│ Type │...│
	// │1 byte│ 2 bytes │ N bytes│4 bytes│ 8 bytes │ N bytes │   │1 byte│   │
	// └──────┴─────────┴────────┴───────┴─────────┴─────────┴───┴──────┴───┘

	encLen := w.MarshalSize() // Type (1), Key Length (2), and Count (4) for each key

	// allocate or re-slice to correct size
	if len(dst) < encLen {
		dst = make([]byte, encLen)
	} else {
		dst = dst[:encLen]

	// Finally, encode the entry
	var n int
	var curType byte

	for k, v := range w.Values {
		switch v[0].(type) {
		case FloatValue:
			curType = float64EntryType
		case IntegerValue:
			curType = integerEntryType
		case UnsignedValue:
			curType = unsignedEntryType
		case BooleanValue:
			curType = booleanEntryType
		case StringValue:
			curType = stringEntryType
			return nil, fmt.Errorf("unsupported value type: %T", v[0])
		dst[n] = curType

		binary.BigEndian.PutUint16(dst[n:n+2], uint16(len(k)))
		n += 2
		n += copy(dst[n:], k)

		binary.BigEndian.PutUint32(dst[n:n+4], uint32(len(v)))
		n += 4

		for _, vv := range v {
			binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.UnixNano()))
			n += 8

			switch vv := vv.(type) {
			case FloatValue:
				if curType != float64EntryType {
					return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
				binary.BigEndian.PutUint64(dst[n:n+8], math.Float64bits(vv.value))
				n += 8
			case IntegerValue:
				if curType != integerEntryType {
					return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
				binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.value))
				n += 8
			case UnsignedValue:
				if curType != unsignedEntryType {
					return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
				binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.value))
				n += 8
			case BooleanValue:
				if curType != booleanEntryType {
					return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
				if vv.value {
					dst[n] = 1
				} else {
					dst[n] = 0
			case StringValue:
				if curType != stringEntryType {
					return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
				binary.BigEndian.PutUint32(dst[n:n+4], uint32(len(vv.value)))
				n += 4
				n += copy(dst[n:], vv.value)
				return nil, fmt.Errorf("unsupported value found in %T slice: %T", v[0].Value(), vv)

	return dst[:n], nil

可以看出, 先写cache, 也就是先向内存里写. 然后再向wal文件里写.

	// first try to write to the cache
	if err := e.Cache.WriteMulti(values); err != nil {
		return err

	if e.WALEnabled {
		if _, err := e.WAL.WriteMulti(values); err != nil {
			return err


(gdb) p keyBuf
$1 = {array = 0xc000ab61a0 "cpu#!~#value", len = 12, cap = 16}
(gdb) bt
#0  influxdb.cluster/tsdb/engine/tsm1.(*Engine).WritePoints (e=0xc0006418c0, points=..., ~r1=...) at /root/work/influxdb-1.8.4/tsdb/engine/tsm1/engine.go:1340
#1  0x0000000000d71c3e in influxdb.cluster/tsdb.(*Shard).WritePoints (s=0xc0000cf8c0, points=..., ~r1=...) at /root/work/influxdb-1.8.4/tsdb/shard.go:525
#2  0x0000000000d951ee in influxdb.cluster/tsdb.(*Store).WriteToShard (s=0xc0000f6c00, shardID=7, points=..., ~r2=...) at /root/work/influxdb-1.8.4/tsdb/store.go:1395
#3  0x0000000000db59b9 in influxdb.cluster/coordinator.(*PointsWriter).writeToShard.func1 (shardID=7, owner=..., points=...) at /root/work/influxdb-1.8.4/coordinator/points_writer.go:443
#4  0x0000000000db5712 in influxdb.cluster/coordinator.(*PointsWriter).writeToShard·dwrap·8 () at /root/work/influxdb-1.8.4/coordinator/points_writer.go:470
#5  0x0000000000470ac1 in runtime.goexit () at /usr/local/go/src/runtime/asm_amd64.s:1581
#6  0x0000000000000056 in ?? ()
#7  0x0000000000000011 in ?? ()
#8  0x000000000000005e in ?? ()
#9  0x000000000000000e in ?? ()
#10 0x000000c000e3a000 in ?? ()
#11 0x0000000000000000 in ?? ()


  1.  写入cache成功但是写WAL文件失败, 认为本次写失败. 该策略交由客户端去做重试写?