

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 PySpark 分析模板中的参数
<a name="pyspark-parameter-handling"></a>

参数允许在作业提交时提供不同的值，从而提高 PySpark 分析模板的灵活性。参数可通过传递给入口点函数的上下文对象进行访问。

**注意**  
参数是用户提供的字符串，可以包含任意内容。  
查看代码以确保参数得到安全处理，以防止分析中出现意外行为。
无论提交时提供什么参数值，设计参数处理都要安全运行。

## 访问参数
<a name="accessing-parameters"></a>

参数可在`context['analysisParameters']`字典中找到。所有参数值均为字符串。

**Example 安全访问参数**  

```
def entrypoint(context):
    # Access parameters from context
    parameters = context['analysisParameters']
    threshold = parameters['threshold']
    table_name = parameters['table_name']
    
    # Continue with analysis using parameters
    spark = context['sparkSession']
    input_df = context['referencedTables'][table_name]
    
    # Convert threshold value
    threshold_val = int(threshold)
    
    # Use parameter in DataFrame operation
    filtered_df = input_df.filter(input_df.amount > threshold_val)
    
    return {
        "results": {
            "output": filtered_df
        }
    }
```

## 参数安全的最佳实践
<a name="parameter-security-best-practices"></a>

**警告**  
参数是用户提供的字符串，可以包含任意内容。您必须安全地处理参数，以防分析代码中存在安全漏洞。

**要避免的不安全参数处理模式：**
+ **将参数作为代码执行** — 切勿`exec()`在参数值上使用 o `eval()` r

  ```
  # UNSAFE - Don't do this
  eval(parameters['expression'])  # Can execute arbitrary code
  ```
+ **SQL 字符串插值** — 切勿将参数直接连接到 SQL 字符串中

  ```
  # UNSAFE - Don't do this
  sql = f"SELECT * FROM table WHERE column = '{parameters['value']}'"  # SQL injection risk
  ```
+ **不安全的文件路径操作** — 未经验证，切勿在文件系统操作中直接使用参数

  ```
  # UNSAFE - Don't do this
  file_path = f"/data/{parameters['filename']}"  # Path traversal risk
  ```

**安全的参数处理模式：**
+ **在 DataFrame 操作中使用参数** — Spark DataFrames 可以安全地处理参数值

  ```
  # SAFE - Use parameters in DataFrame operations
  threshold = int(parameters['threshold'])
  filtered_df = input_df.filter(input_df.value > threshold)
  ```
+ **验证参数值**-使用前检查参数是否符合预期格式

  ```
  # SAFE - Validate parameters before use
  def validate_date(date_str):
      try:
          from datetime import datetime
          datetime.strptime(date_str, '%Y-%m-%d')
          return True
      except ValueError:
          return False
  
  date_param = parameters['date_filter'] or '2024-01-01'
  if not validate_date(date_param):
      raise ValueError(f"Invalid date format: {date_param}")
  ```
+ **对参数值使用许可名单**-如果可能，请根据已知的正确值验证参数

  ```
  # SAFE - Use allowlists
  allowed_columns = ['column1', 'column2', 'column3']
  column_param = parameters['column_name']
  if column_param not in allowed_columns:
      raise ValueError(f"Invalid column: {column_param}")
  ```
+ **带错误处理的类型转**换-将字符串参数安全地转换为预期类型

  ```
  # SAFE - Convert with error handling
  try:
      batch_size = int(parameters['batch_size'] or '1000')
      if batch_size <= 0 or batch_size > 10000:
          raise ValueError(f"Batch size must be between 1 and 10000")
  except ValueError as e:
      print(f"Invalid parameter: {e}")
      raise
  ```

**重要**  
请记住，当作业运行器提供不同的值时，参数会绕过代码审查。无论提供什么参数值，都要设计出可以安全运行的参数处理。

## 完整参数示例
<a name="parameter-examples"></a>

**Example 在 PySpark 脚本中安全地使用参数**  

```
def entrypoint(context):
    try:
        # Access Spark session and tables
        spark = context['sparkSession']
        input_table = context['referencedTables']['sales_data']
        
        # Access parameters - fail fast if analysisParameters missing
        parameters = context['analysisParameters']
        
        # Validate and convert numeric parameter (handles empty strings with default)
        try:
            threshold = int(parameters['threshold'] or '100')
            if threshold <= 0:
                raise ValueError("Threshold must be positive")
        except (ValueError, TypeError) as e:
            print(f"Invalid threshold parameter: {e}")
            raise
        
        # Validate date parameter (handles empty strings with default)
        date_filter = parameters['start_date'] or '2024-01-01'
        from datetime import datetime
        try:
            datetime.strptime(date_filter, '%Y-%m-%d')
        except ValueError:
            raise ValueError(f"Invalid date format: {date_filter}")
        
        # Use parameters safely in DataFrame operations
        filtered_df = input_table.filter(
            (input_table.amount > threshold) &
            (input_table.date >= date_filter)
        )
        
        result_df = filtered_df.groupBy("category").agg(
            {"amount": "sum"}
        )
        
        return {
            "results": {
                "filtered_results": result_df
            }
        }
    
    except Exception as e:
        print(f"Error in analysis: {str(e)}")
        raise
```