tokenstream_test.go
1 // Copyright (c) 2024-2026 Tencent Zhuque Lab. All rights reserved. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 // 15 // Requirement: Any integration or derivative work must explicitly attribute 16 // Tencent Zhuque Lab (https://github.com/Tencent/AI-Infra-Guard) in its 17 // documentation or user interface, as detailed in the NOTICE file. 18 19 package parser 20 21 import ( 22 "testing" 23 24 "github.com/stretchr/testify/assert" 25 "github.com/stretchr/testify/require" 26 ) 27 28 // TestTokenStream_HasNext_Empty 测试空 token 流的 hasNext 29 func TestTokenStream_HasNext_Empty(t *testing.T) { 30 ts := newTokenStream([]Token{}) 31 // 空流没有下一个 token 32 assert.False(t, ts.hasNext(), "空 tokenStream 的 hasNext 应返回 false") 33 } 34 35 // TestTokenStream_HasNext_NonEmpty 测试非空 token 流的 hasNext 36 func TestTokenStream_HasNext_NonEmpty(t *testing.T) { 37 tokens := []Token{ 38 {name: tokenText, content: "hello"}, 39 } 40 ts := newTokenStream(tokens) 41 // 有元素时 hasNext 应返回 true 42 assert.True(t, ts.hasNext(), "非空 tokenStream 的 hasNext 应返回 true") 43 } 44 45 // TestTokenStream_Next 测试 next 依次返回 token 46 func TestTokenStream_Next(t *testing.T) { 47 tokens := []Token{ 48 {name: tokenText, content: "first"}, 49 {name: tokenText, content: "second"}, 50 } 51 ts := newTokenStream(tokens) 52 53 // 第一次调用 next 返回第一个 token 54 tok, err := ts.next() 55 require.NoError(t, err, "第一次 next 不应报错") 56 assert.Equal(t, "first", tok.content, "第一次 next 应返回 'first'") 57 58 // 第二次调用 next 返回第二个 token 59 tok, err = ts.next() 60 require.NoError(t, err, "第二次 next 不应报错") 61 assert.Equal(t, "second", tok.content, "第二次 next 应返回 'second'") 62 63 // 流已耗尽,hasNext 应返回 false 64 assert.False(t, ts.hasNext(), "所有 token 消费后 hasNext 应返回 false") 65 } 66 67 // TestTokenStream_Next_Exhausted 测试超出范围时 next 返回错误 68 func TestTokenStream_Next_Exhausted(t *testing.T) { 69 ts := newTokenStream([]Token{}) 70 // 空流调用 next 应返回错误 71 _, err := ts.next() 72 assert.Error(t, err, "空流调用 next 应返回错误") 73 } 74 75 // TestTokenStream_Rewind 测试 rewind 回退一步 76 func TestTokenStream_Rewind(t *testing.T) { 77 tokens := []Token{ 78 {name: tokenText, content: "a"}, 79 {name: tokenText, content: "b"}, 80 } 81 ts := newTokenStream(tokens) 82 83 // 消费第一个 token 84 tok, err := ts.next() 85 require.NoError(t, err) 86 assert.Equal(t, "a", tok.content, "next 应返回 'a'") 87 88 // 回退后再次 next 应返回同一个 token 89 ts.rewind() 90 tok, err = ts.next() 91 require.NoError(t, err, "rewind 后 next 不应报错") 92 assert.Equal(t, "a", tok.content, "rewind 后 next 应再次返回 'a'") 93 } 94 95 // TestTokenStream_HasNext_AfterConsumingAll 测试消费完所有 token 后 hasNext 96 func TestTokenStream_HasNext_AfterConsumingAll(t *testing.T) { 97 tokens := []Token{ 98 {name: tokenAnd, content: "&&"}, 99 } 100 ts := newTokenStream(tokens) 101 102 assert.True(t, ts.hasNext(), "消费前 hasNext 应为 true") 103 _, err := ts.next() 104 require.NoError(t, err) 105 // 消费完后 hasNext 应为 false 106 assert.False(t, ts.hasNext(), "消费完所有 token 后 hasNext 应为 false") 107 } 108 109 // TestTokenStream_RewindThenHasNext 测试 rewind 后 hasNext 的行为 110 func TestTokenStream_RewindThenHasNext(t *testing.T) { 111 tokens := []Token{ 112 {name: tokenText, content: "x"}, 113 } 114 ts := newTokenStream(tokens) 115 116 // 消费唯一的 token 117 _, err := ts.next() 118 require.NoError(t, err) 119 assert.False(t, ts.hasNext(), "消费后 hasNext 应为 false") 120 121 // rewind 回退后 hasNext 应重新为 true 122 ts.rewind() 123 assert.True(t, ts.hasNext(), "rewind 后 hasNext 应重新为 true") 124 } 125 126 // TestTokenStream_MultipleTokens_Sequential 测试顺序消费多个 token 127 func TestTokenStream_MultipleTokens_Sequential(t *testing.T) { 128 // 使用真实解析器生成 token,验证 tokenStream 与解析流程集成 129 tokens, err := ParseTokens(`body="test" && header="x-header"`) 130 require.NoError(t, err, "ParseTokens 不应报错") 131 132 ts := newTokenStream(tokens) 133 count := 0 134 for ts.hasNext() { 135 _, err := ts.next() 136 require.NoError(t, err) 137 count++ 138 } 139 // 应恰好消费完所有 token 140 assert.Equal(t, len(tokens), count, "消费的 token 数应等于总 token 数") 141 }